vellaveto_http_proxy/proxy/websocket/mod.rs
1// Copyright 2026 Paolo Vella
2// SPDX-License-Identifier: BUSL-1.1
3//
4// Use of this software is governed by the Business Source License
5// included in the LICENSE-BSL-1.1 file at the root of this repository.
6//
7// Change Date: Three years from the date of publication of this version.
8// Change License: MPL-2.0
9
10//! WebSocket transport for MCP JSON-RPC messages (SEP-1288).
11//!
12//! Implements a WebSocket reverse proxy that sits between MCP clients and
13//! an upstream MCP server. WebSocket messages (text frames) are parsed as
14//! JSON-RPC, classified via `vellaveto_mcp::extractor`, evaluated against
15//! loaded policies, and forwarded to the upstream server.
16//!
17//! Security invariants:
18//! - **Fail-closed**: Unparseable messages close the connection (code 1008).
19//! - **No binary frames**: Only text frames are accepted (code 1003 for binary).
20//! - **Session binding**: Each WS connection is bound to exactly one session.
21//! - **Canonicalization**: Re-serialized JSON forwarded (TOCTOU defense).
22
23use axum::{
24 extract::{
25 ws::{CloseFrame, Message, WebSocket, WebSocketUpgrade},
26 ConnectInfo, State,
27 },
28 http::HeaderMap,
29 response::Response,
30};
31use futures_util::{SinkExt, StreamExt};
32use serde_json::{json, Value};
33use std::net::SocketAddr;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::sync::Arc;
36use std::time::Duration;
37use tokio::sync::Mutex;
38use vellaveto_mcp::extractor::{self, MessageType};
39use vellaveto_mcp::inspection::{
40 inspect_for_injection, scan_notification_for_secrets, scan_parameters_for_secrets,
41 scan_response_for_secrets, scan_text_for_secrets, scan_tool_descriptions,
42 scan_tool_descriptions_with_scanner,
43};
44use vellaveto_mcp::mediation::{
45 build_acis_envelope_with_security_context, build_secondary_acis_envelope_with_security_context,
46 mediate_with_security_context,
47};
48use vellaveto_mcp::output_validation::ValidationResult;
49use vellaveto_types::acis::{AcisDecisionEnvelope, DecisionOrigin};
50use vellaveto_types::{Action, EvaluationContext, RuntimeSecurityContext, Verdict};
51
52use super::auth::{validate_agent_identity, validate_api_key, validate_oauth};
53use super::call_chain::{
54 check_privilege_escalation, sync_session_call_chain_from_headers, take_tracked_tool_call,
55 track_pending_tool_call,
56};
57use super::origin::validate_origin;
58use super::ProxyState;
59use crate::proxy_metrics::record_dlp_finding;
60
61const INVALID_PRESENTED_APPROVAL_REASON: &str = "Supplied approval is not valid for this action";
62
63/// Configuration for WebSocket transport.
64#[derive(Debug, Clone)]
65pub struct WebSocketConfig {
66 /// Maximum message size in bytes (default: 1 MB).
67 pub max_message_size: usize,
68 /// Idle timeout in seconds — close connection after no message activity (default: 300s).
69 /// SECURITY (FIND-R182-001): True idle timeout that resets on every message.
70 pub idle_timeout_secs: u64,
71 /// Maximum messages per second per connection for client-to-upstream (default: 100).
72 pub message_rate_limit: u32,
73 /// Maximum messages per second per connection for upstream-to-client (default: 500).
74 /// SECURITY (FIND-R46-WS-003): Rate limits on the upstream→client direction prevent
75 /// a malicious upstream from flooding the client with responses.
76 pub upstream_rate_limit: u32,
77}
78
79impl Default for WebSocketConfig {
80 fn default() -> Self {
81 Self {
82 max_message_size: 1_048_576,
83 idle_timeout_secs: 300,
84 message_rate_limit: 100,
85 upstream_rate_limit: 500,
86 }
87 }
88}
89
90/// WebSocket close codes per RFC 6455.
91const CLOSE_POLICY_VIOLATION: u16 = 1008;
92const CLOSE_UNSUPPORTED_DATA: u16 = 1003;
93/// Close code for oversized messages. Used by axum's `max_message_size`
94/// automatically; kept here for documentation and test assertions.
95#[cfg(test)]
96const CLOSE_MESSAGE_TOO_BIG: u16 = 1009;
97const CLOSE_NORMAL: u16 = 1000;
98
99/// Global WebSocket metrics counters.
100static WS_CONNECTIONS_TOTAL: AtomicU64 = AtomicU64::new(0);
101static WS_MESSAGES_TOTAL: AtomicU64 = AtomicU64::new(0);
102
103/// Record WebSocket connection metric.
104fn record_ws_connection() {
105 // SECURITY (FIND-R182-003): Use saturating arithmetic to prevent overflow.
106 // SECURITY (CA-005): SeqCst for security-adjacent metrics counters.
107 let _ = WS_CONNECTIONS_TOTAL.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
108 Some(v.saturating_add(1))
109 });
110 metrics::counter!("vellaveto_ws_connections_total").increment(1);
111}
112
113/// Record WebSocket message metric.
114fn record_ws_message(direction: &str) {
115 // SECURITY (FIND-R182-003): Use saturating arithmetic to prevent overflow.
116 // SECURITY (CA-005): SeqCst for security-adjacent metrics counters.
117 let _ = WS_MESSAGES_TOTAL.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
118 Some(v.saturating_add(1))
119 });
120 metrics::counter!(
121 "vellaveto_ws_messages_total",
122 "direction" => direction.to_string()
123 )
124 .increment(1);
125}
126
127/// Get current connection count (for testing).
128#[cfg(test)]
129pub(crate) fn ws_connections_count() -> u64 {
130 WS_CONNECTIONS_TOTAL.load(Ordering::SeqCst)
131}
132
133/// Get current message count (for testing).
134#[cfg(test)]
135pub(crate) fn ws_messages_count() -> u64 {
136 WS_MESSAGES_TOTAL.load(Ordering::SeqCst)
137}
138
139#[derive(Clone)]
140struct WsHandshakeSecurity {
141 oauth_claims: Option<crate::proxy::auth::OAuthValidationEvidence>,
142 headers: HeaderMap,
143}
144
145fn build_ws_runtime_security_context(
146 msg: &Value,
147 action: &Action,
148 headers: &HeaderMap,
149 inputs: super::helpers::TransportSecurityInputs<'_>,
150) -> Option<RuntimeSecurityContext> {
151 super::helpers::build_runtime_security_context(msg, action, headers, inputs)
152}
153
154fn refresh_ws_acis_envelope(
155 envelope: &AcisDecisionEnvelope,
156 action: &Action,
157 verdict: &Verdict,
158 origin: DecisionOrigin,
159 session_id: &str,
160 eval_ctx: &EvaluationContext,
161 security_context: Option<&RuntimeSecurityContext>,
162) -> AcisDecisionEnvelope {
163 build_acis_envelope_with_security_context(
164 &envelope.decision_id,
165 action,
166 verdict,
167 origin,
168 "websocket",
169 &envelope.findings,
170 envelope.evaluation_us,
171 Some(session_id),
172 None,
173 Some(eval_ctx),
174 security_context,
175 )
176}
177
178use vellaveto_types::is_unicode_format_char as is_unicode_format_char_ws;
179
180/// Query parameters for the WebSocket upgrade endpoint.
181#[derive(Debug, serde::Deserialize, Default)]
182#[serde(deny_unknown_fields)]
183pub struct WsQueryParams {
184 /// Optional session ID for session resumption.
185 #[serde(default)]
186 pub session_id: Option<String>,
187}
188
189/// Handle WebSocket upgrade request at `/mcp/ws`.
190///
191/// Authenticates the request, validates origin, creates/resumes a session,
192/// and upgrades the HTTP connection to a WebSocket.
193pub async fn handle_ws_upgrade(
194 State(state): State<ProxyState>,
195 ConnectInfo(addr): ConnectInfo<SocketAddr>,
196 headers: HeaderMap,
197 query: axum::extract::Query<WsQueryParams>,
198 ws: WebSocketUpgrade,
199) -> Response {
200 // 1. Validate origin (CSRF / DNS rebinding defense)
201 if let Err(resp) = validate_origin(&headers, &state.bind_addr, &state.allowed_origins) {
202 return resp;
203 }
204
205 // 2. Authenticate before upgrade (API key or OAuth)
206 if let Err(resp) = validate_api_key(&state, &headers) {
207 return resp;
208 }
209
210 // SECURITY (FIND-R53-WS-001): Validate OAuth token at upgrade time.
211 // Parity with HTTP POST (handlers.rs:154) and GET (handlers.rs:2864).
212 // Without this, WS connections bypass token expiry checks.
213 let oauth_claims = match validate_oauth(
214 &state,
215 &headers,
216 "GET",
217 &super::auth::build_effective_request_uri(
218 &headers,
219 state.bind_addr,
220 &axum::http::Uri::from_static("/mcp/ws"),
221 false,
222 ),
223 query.session_id.as_deref(),
224 )
225 .await
226 {
227 Ok(claims) => claims,
228 Err(response) => return response,
229 };
230
231 // SECURITY (FIND-R53-WS-002, FIND-R159-003): Validate agent identity at upgrade time.
232 // Parity with HTTP POST (handlers.rs:160) and GET (handlers.rs:2871).
233 // FIND-R159-003: Identity MUST be stored in session (was previously discarded with `_`).
234 let agent_identity = match validate_agent_identity(&state, &headers).await {
235 Ok(identity) => identity,
236 Err(response) => return response,
237 };
238
239 // SECURITY (FIND-R55-WS-004, FIND-R81-001): Validate session_id length and
240 // control/format characters from query parameter. Parity with HTTP POST/GET
241 // handlers (handlers.rs:154, handlers.rs:2928) which reject control chars.
242 // SECURITY (FIND-R81-WS-001): Also reject Unicode format characters (zero-width,
243 // bidi overrides, BOM) that can bypass string-based security checks.
244 let ws_session_id = query.session_id.as_deref().filter(|id| {
245 !id.is_empty()
246 && id.len() <= 128
247 && !id
248 .chars()
249 .any(|c| c.is_control() || is_unicode_format_char_ws(c))
250 });
251
252 // 3. Get or create session
253 let session_id = state.sessions.get_or_create(ws_session_id);
254
255 // SECURITY (FIND-R53-WS-003): Session ownership binding — prevent session fixation.
256 // Parity with HTTP GET (handlers.rs:2914-2953).
257 if let Some(ref claims) = oauth_claims {
258 if let Some(mut session) = state.sessions.get_mut(&session_id) {
259 match &session.oauth_subject {
260 Some(owner) if owner != &claims.sub => {
261 tracing::warn!(
262 session_id = %session_id,
263 owner = %owner,
264 requester = %claims.sub,
265 "WS upgrade rejected: session owned by different principal"
266 );
267 return axum::response::IntoResponse::into_response((
268 axum::http::StatusCode::FORBIDDEN,
269 axum::Json(json!({
270 "error": "Session belongs to another principal"
271 })),
272 ));
273 }
274 None => {
275 // Bind session to this OAuth subject
276 session.oauth_subject = Some(claims.sub.clone());
277 // SECURITY (FIND-R73-SRV-006): Store token expiry, matching
278 // HTTP POST handler pattern to enforce token lifetime.
279 if claims.exp > 0 {
280 session.token_expires_at = Some(claims.exp);
281 }
282 }
283 _ => {
284 // Already owned by this principal — use earliest expiry
285 // SECURITY (FIND-R73-SRV-006): Parity with HTTP POST handler
286 // (R23-PROXY-6) — prevent long-lived tokens from extending
287 // sessions originally bound to short-lived tokens.
288 if claims.exp > 0 {
289 session.token_expires_at = Some(
290 session
291 .token_expires_at
292 .map_or(claims.exp, |existing| existing.min(claims.exp)),
293 );
294 }
295 }
296 }
297 }
298 }
299
300 // SECURITY (FIND-R159-003): Store agent identity in session — parity with HTTP
301 // POST (handlers.rs:295-298) and GET (handlers.rs:3641-3643). Without this,
302 // ABAC policies referencing agent_identity would evaluate against None for
303 // WebSocket connections, creating a policy bypass.
304 if let Some(ref identity) = agent_identity {
305 if let Some(mut session) = state.sessions.get_mut(&session_id) {
306 session.agent_identity = Some(identity.clone());
307 }
308 } else if let Some(ref oauth_evidence) = oauth_claims {
309 match oauth_evidence.projected_agent_identity() {
310 Ok(Some(identity)) => {
311 if let Some(mut session) = state.sessions.get_mut(&session_id) {
312 session.agent_identity.get_or_insert(identity);
313 }
314 }
315 Ok(None) => {}
316 Err(error) => {
317 tracing::warn!(
318 "WebSocket transport identity claims validation failed: {}",
319 error
320 );
321 return axum::response::IntoResponse::into_response((
322 axum::http::StatusCode::UNAUTHORIZED,
323 axum::Json(json!({
324 "error": "Invalid authorization token"
325 })),
326 ));
327 }
328 }
329 }
330
331 // SECURITY (FIND-R46-006): Validate and extract call chain from upgrade headers.
332 // The call chain is synced once during upgrade and reused for all messages
333 // in this WebSocket connection.
334 if let Err(reason) = super::call_chain::validate_call_chain_header(&headers, &state.limits) {
335 tracing::warn!(
336 session_id = %session_id,
337 "WS upgrade rejected: invalid call chain header: {}",
338 reason
339 );
340 return axum::response::IntoResponse::into_response((
341 axum::http::StatusCode::BAD_REQUEST,
342 axum::Json(json!({
343 "error": "Invalid request"
344 })),
345 ));
346 }
347 sync_session_call_chain_from_headers(
348 &state.sessions,
349 &session_id,
350 &headers,
351 state.call_chain_hmac_key.as_ref(),
352 &state.limits,
353 );
354
355 let ws_config = state.ws_config.clone().unwrap_or_default();
356
357 // Phase 28: Extract W3C Trace Context from the HTTP upgrade request headers.
358 // The trace_id is used for correlating all audit entries during this WS session.
359 let trace_ctx = super::trace_propagation::extract_trace_context(&headers);
360 let ws_trace_id = trace_ctx
361 .trace_id
362 .clone()
363 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string().replace('-', ""));
364
365 tracing::info!(
366 session_id = %session_id,
367 trace_id = %ws_trace_id,
368 peer = %addr,
369 "WebSocket upgrade accepted"
370 );
371
372 // 4. Configure and upgrade
373 ws.max_message_size(ws_config.max_message_size)
374 .on_upgrade(move |socket| {
375 handle_ws_connection(
376 socket,
377 state,
378 session_id,
379 ws_config,
380 addr,
381 ws_trace_id,
382 WsHandshakeSecurity {
383 oauth_claims,
384 headers,
385 },
386 )
387 })
388}
389
390/// Handle an established WebSocket connection.
391///
392/// Establishes an upstream WS connection, then relays messages bidirectionally
393/// with policy enforcement on client→upstream messages and DLP/injection
394/// scanning on upstream→client messages.
395async fn handle_ws_connection(
396 client_ws: WebSocket,
397 state: ProxyState,
398 session_id: String,
399 ws_config: WebSocketConfig,
400 peer_addr: SocketAddr,
401 trace_id: String,
402 handshake_security: WsHandshakeSecurity,
403) {
404 record_ws_connection();
405 let start = std::time::Instant::now();
406 tracing::debug!(
407 session_id = %session_id,
408 trace_id = %trace_id,
409 "WebSocket connection established with trace context"
410 );
411
412 // Connect to upstream — use gateway default backend if configured
413 let upstream_url = if let Some(ref gw) = state.gateway {
414 match gw.route("") {
415 Some(d) => convert_to_ws_url(&d.upstream_url),
416 None => {
417 tracing::error!(session_id = %session_id, "No healthy upstream for WebSocket");
418 let (mut client_sink, _) = client_ws.split();
419 let _ = client_sink
420 .send(Message::Close(Some(CloseFrame {
421 code: CLOSE_POLICY_VIOLATION,
422 reason: "No healthy upstream available".into(),
423 })))
424 .await;
425 return;
426 }
427 }
428 } else {
429 convert_to_ws_url(&state.upstream_url)
430 };
431 let upstream_ws = match connect_upstream_ws(&upstream_url).await {
432 Ok(ws) => ws,
433 Err(e) => {
434 tracing::error!(
435 session_id = %session_id,
436 "Failed to connect to upstream WebSocket: {}",
437 e
438 );
439 // Send close frame to client
440 let (mut client_sink, _) = client_ws.split();
441 let _ = client_sink
442 .send(Message::Close(Some(CloseFrame {
443 code: CLOSE_POLICY_VIOLATION,
444 reason: "Upstream connection failed".into(),
445 })))
446 .await;
447 return;
448 }
449 };
450
451 let (client_sink, client_stream) = client_ws.split();
452 let (upstream_sink, upstream_stream) = upstream_ws.split();
453
454 // Wrap sinks in Arc<Mutex> for shared access
455 let client_sink = Arc::new(Mutex::new(client_sink));
456 let upstream_sink = Arc::new(Mutex::new(upstream_sink));
457
458 // Rate limiter state: track messages in the current second window
459 let rate_counter = Arc::new(AtomicU64::new(0));
460 let rate_window_start = Arc::new(std::sync::Mutex::new(std::time::Instant::now()));
461
462 // SECURITY (FIND-R46-WS-003): Separate rate limiter for upstream→client direction
463 let upstream_rate_counter = Arc::new(AtomicU64::new(0));
464 let upstream_rate_window_start = Arc::new(std::sync::Mutex::new(std::time::Instant::now()));
465
466 let idle_timeout = Duration::from_secs(ws_config.idle_timeout_secs);
467
468 // SECURITY (FIND-R182-001): Shared last-activity tracker so idle timeout resets
469 // on every message (true idle detection, not max-lifetime).
470 let last_activity = Arc::new(AtomicU64::new(0));
471 let connection_epoch = std::time::Instant::now();
472
473 // Client → Vellaveto → Upstream relay
474 let client_to_upstream = {
475 let state = state.clone();
476 let session_id = session_id.clone();
477 let client_sink = client_sink.clone();
478 let upstream_sink = upstream_sink.clone();
479 let rate_counter = rate_counter.clone();
480 let rate_window_start = rate_window_start.clone();
481 let ws_config = ws_config.clone();
482 let last_activity = last_activity.clone();
483 let handshake_security = handshake_security.clone();
484
485 relay_client_to_upstream(
486 client_stream,
487 client_sink,
488 upstream_sink,
489 state,
490 session_id,
491 ws_config,
492 rate_counter,
493 rate_window_start,
494 last_activity,
495 connection_epoch,
496 handshake_security.oauth_claims,
497 handshake_security.headers,
498 )
499 };
500
501 // Upstream → Vellaveto → Client relay
502 let upstream_to_client = {
503 let state = state.clone();
504 let session_id = session_id.clone();
505 let client_sink = client_sink.clone();
506 let ws_config = ws_config.clone();
507 let last_activity = last_activity.clone();
508
509 relay_upstream_to_client(
510 upstream_stream,
511 client_sink,
512 state,
513 session_id,
514 ws_config,
515 upstream_rate_counter,
516 upstream_rate_window_start,
517 last_activity,
518 connection_epoch,
519 )
520 };
521
522 // SECURITY (FIND-R182-001): True idle timeout — check periodically and
523 // close only if no message activity since last check.
524 let idle_check = {
525 let session_id = session_id.clone();
526 let last_activity = last_activity.clone();
527 async move {
528 // Check every 10% of idle timeout (min 1s) for responsive detection.
529 let check_interval = Duration::from_secs((ws_config.idle_timeout_secs / 10).max(1));
530 let mut interval = tokio::time::interval(check_interval);
531 interval.tick().await; // first tick is immediate, skip it
532 loop {
533 interval.tick().await;
534 // SECURITY (R251-WS-1): SeqCst for cross-thread session timeout enforcement.
535 let last_ms = last_activity.load(Ordering::SeqCst);
536 // SECURITY (FIND-R190-002): Use saturating_sub to prevent underflow.
537 let elapsed_since_activity =
538 (connection_epoch.elapsed().as_millis() as u64).saturating_sub(last_ms);
539 if elapsed_since_activity >= idle_timeout.as_millis() as u64 {
540 tracing::info!(
541 session_id = %session_id,
542 idle_secs = elapsed_since_activity / 1000,
543 "WebSocket idle timeout ({}s), closing",
544 ws_config.idle_timeout_secs
545 );
546 break;
547 }
548 }
549 }
550 };
551
552 // Run both relay loops with idle timeout
553 tokio::select! {
554 _ = client_to_upstream => {
555 tracing::debug!(session_id = %session_id, "Client stream ended");
556 }
557 _ = upstream_to_client => {
558 tracing::debug!(session_id = %session_id, "Upstream stream ended");
559 }
560 _ = idle_check => {}
561 }
562
563 // Clean shutdown: close both sides
564 {
565 let mut sink = client_sink.lock().await;
566 let _ = sink
567 .send(Message::Close(Some(CloseFrame {
568 code: CLOSE_NORMAL,
569 reason: "Session ended".into(),
570 })))
571 .await;
572 }
573 {
574 let mut sink = upstream_sink.lock().await;
575 let _ = sink.close().await;
576 }
577
578 let duration = start.elapsed();
579 metrics::histogram!("vellaveto_ws_connection_duration_seconds").record(duration.as_secs_f64());
580
581 tracing::info!(
582 session_id = %session_id,
583 peer = %peer_addr,
584 duration_secs = duration.as_secs(),
585 "WebSocket connection closed"
586 );
587}
588
589/// Relay messages from client to upstream with policy enforcement.
590#[allow(clippy::too_many_arguments)]
591async fn relay_client_to_upstream(
592 mut client_stream: futures_util::stream::SplitStream<WebSocket>,
593 client_sink: Arc<Mutex<futures_util::stream::SplitSink<WebSocket, Message>>>,
594 upstream_sink: Arc<
595 Mutex<
596 futures_util::stream::SplitSink<
597 tokio_tungstenite::WebSocketStream<
598 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
599 >,
600 tokio_tungstenite::tungstenite::Message,
601 >,
602 >,
603 >,
604 state: ProxyState,
605 session_id: String,
606 ws_config: WebSocketConfig,
607 rate_counter: Arc<AtomicU64>,
608 rate_window_start: Arc<std::sync::Mutex<std::time::Instant>>,
609 last_activity: Arc<AtomicU64>,
610 connection_epoch: std::time::Instant,
611 oauth_claims: Option<crate::proxy::auth::OAuthValidationEvidence>,
612 handshake_headers: HeaderMap,
613) {
614 while let Some(msg_result) = client_stream.next().await {
615 let msg = match msg_result {
616 Ok(m) => m,
617 Err(e) => {
618 tracing::debug!(session_id = %session_id, "Client WS error: {}", e);
619 break;
620 }
621 };
622
623 // SECURITY (FIND-R182-001): Update last-activity for true idle detection.
624 // SECURITY (R251-WS-1): SeqCst ensures visibility to timeout checker thread.
625 last_activity.store(
626 connection_epoch.elapsed().as_millis() as u64,
627 Ordering::SeqCst,
628 );
629
630 record_ws_message("client_to_upstream");
631
632 // SECURITY (FIND-R52-WS-003): Per-message OAuth token expiry check.
633 // After WebSocket upgrade, the HTTP auth middleware no longer runs.
634 // A token that expires mid-connection must be rejected to prevent
635 // indefinite access via a long-lived WebSocket.
636 {
637 let token_expired = state
638 .sessions
639 .get_mut(&session_id)
640 .and_then(|s| {
641 s.token_expires_at.map(|exp| {
642 let now = std::time::SystemTime::now()
643 .duration_since(std::time::UNIX_EPOCH)
644 .unwrap_or_default()
645 .as_secs();
646 now >= exp
647 })
648 })
649 .unwrap_or(false);
650 if token_expired {
651 tracing::warn!(
652 session_id = %session_id,
653 "SECURITY: OAuth token expired during WebSocket session, closing"
654 );
655 let error = json!({
656 "jsonrpc": "2.0",
657 "error": {
658 "code": -32001,
659 "message": "Session expired"
660 },
661 "id": null
662 });
663 let error_text = serde_json::to_string(&error)
664 .unwrap_or_else(|_| r#"{"jsonrpc":"2.0","error":{"code":-32001,"message":"Session expired"},"id":null}"#.to_string());
665 let mut sink = client_sink.lock().await;
666 let _ = sink.send(Message::Text(error_text.into())).await;
667 let _ = sink
668 .send(Message::Close(Some(CloseFrame {
669 code: CLOSE_POLICY_VIOLATION,
670 reason: "Token expired".into(),
671 })))
672 .await;
673 break;
674 }
675 }
676
677 match msg {
678 Message::Text(text) => {
679 // Rate limiting
680 if !check_rate_limit(
681 &rate_counter,
682 &rate_window_start,
683 ws_config.message_rate_limit,
684 ) {
685 tracing::warn!(
686 session_id = %session_id,
687 "WebSocket rate limit exceeded, closing"
688 );
689 let mut sink = client_sink.lock().await;
690 let _ = sink
691 .send(Message::Close(Some(CloseFrame {
692 code: CLOSE_POLICY_VIOLATION,
693 reason: "Rate limit exceeded".into(),
694 })))
695 .await;
696 break;
697 }
698
699 // SECURITY (FIND-R46-005): Reject JSON with duplicate keys before parsing.
700 // Prevents parser-disagreement attacks (CVE-2017-12635, CVE-2020-16250)
701 // where the proxy evaluates one key value but upstream sees another.
702 if let Some(dup_key) = vellaveto_mcp::framing::find_duplicate_json_key(&text) {
703 tracing::warn!(
704 session_id = %session_id,
705 "SECURITY: Rejected WS message with duplicate key: \"{}\"",
706 dup_key
707 );
708 let mut sink = client_sink.lock().await;
709 let _ = sink
710 .send(Message::Close(Some(CloseFrame {
711 code: CLOSE_POLICY_VIOLATION,
712 reason: "Duplicate JSON key detected".into(),
713 })))
714 .await;
715 break;
716 }
717
718 // SECURITY (FIND-R53-WS-004): Reject WS messages with control characters.
719 // Parity with HTTP GET event_id validation (handlers.rs:2899).
720 // Control chars in JSON-RPC messages can be used for log injection
721 // or to bypass string-based security checks.
722 if text.chars().any(|c| {
723 // Allow standard JSON whitespace (\t, \n, \r) but reject other
724 // ASCII control chars and Unicode format chars (FIND-R54-011).
725 (c.is_control() && c != '\n' && c != '\r' && c != '\t')
726 || is_unicode_format_char_ws(c)
727 }) {
728 tracing::warn!(
729 session_id = %session_id,
730 "SECURITY: Rejected WS message with control characters"
731 );
732 let error =
733 make_ws_error_response(None, -32600, "Message contains control characters");
734 let mut sink = client_sink.lock().await;
735 let _ = sink.send(Message::Text(error.into())).await;
736 continue;
737 }
738
739 // Parse JSON
740 let parsed: Value = match serde_json::from_str(&text) {
741 Ok(v) => v,
742 Err(_) => {
743 tracing::warn!(
744 session_id = %session_id,
745 "Unparseable JSON in WebSocket text frame, closing (fail-closed)"
746 );
747 let mut sink = client_sink.lock().await;
748 let _ = sink
749 .send(Message::Close(Some(CloseFrame {
750 code: CLOSE_POLICY_VIOLATION,
751 reason: "Invalid JSON".into(),
752 })))
753 .await;
754 break;
755 }
756 };
757
758 // SECURITY (FIND-R46-WS-001): Injection scanning on client→upstream text frames.
759 // The HTTP proxy scans request bodies for injection; the WebSocket proxy must
760 // do the same to maintain security parity. Fail-closed: if injection is detected
761 // and blocking is enabled, deny the message.
762 if !state.injection_disabled {
763 let scannable = extract_scannable_text_from_request(&parsed);
764 if !scannable.is_empty() {
765 let injection_matches: Vec<String> =
766 if let Some(ref scanner) = state.injection_scanner {
767 scanner
768 .inspect(&scannable)
769 .into_iter()
770 .map(|s| s.to_string())
771 .collect()
772 } else {
773 inspect_for_injection(&scannable)
774 .into_iter()
775 .map(|s| s.to_string())
776 .collect()
777 };
778
779 if !injection_matches.is_empty() {
780 tracing::warn!(
781 "SECURITY: Injection in WS client request! Session: {}, Patterns: {:?}",
782 session_id,
783 injection_matches,
784 );
785
786 let verdict = if state.injection_blocking {
787 Verdict::Deny {
788 reason: format!(
789 "WS request injection blocked: {injection_matches:?}"
790 ),
791 }
792 } else {
793 Verdict::Allow
794 };
795
796 let action = Action::new(
797 "vellaveto",
798 "ws_request_injection",
799 json!({
800 "matched_patterns": injection_matches,
801 "session": session_id,
802 "transport": "websocket",
803 "direction": "client_to_upstream",
804 }),
805 );
806 let request_payload = parsed
807 .get("params")
808 .or_else(|| parsed.get("result"))
809 .cloned();
810 let injection_security_context =
811 request_payload.as_ref().map(|payload| {
812 super::helpers::parameter_injection_security_context(
813 payload,
814 state.injection_blocking,
815 "request_injection",
816 )
817 });
818 let envelope = build_secondary_acis_envelope_with_security_context(
819 &action,
820 &verdict,
821 DecisionOrigin::InjectionScanner,
822 "websocket",
823 Some(&session_id),
824 injection_security_context.as_ref(),
825 );
826 if let Err(e) = state
827 .audit
828 .log_entry_with_acis(
829 &action,
830 &verdict,
831 json!({
832 "source": "ws_proxy",
833 "event": "ws_request_injection_detected",
834 }),
835 envelope,
836 )
837 .await
838 {
839 tracing::warn!("Failed to audit WS request injection: {}", e);
840 }
841
842 if state.injection_blocking {
843 let id = parsed.get("id");
844 let error = make_ws_error_response(
845 id,
846 -32001,
847 "Request blocked: injection detected",
848 );
849 let mut sink = client_sink.lock().await;
850 let _ = sink.send(Message::Text(error.into())).await;
851 continue;
852 }
853 }
854 }
855 }
856
857 let presented_approval_id = super::helpers::extract_approval_id_from_meta(&parsed);
858
859 // Classify and evaluate
860 let classified = extractor::classify_message(&parsed);
861 match classified {
862 MessageType::ToolCall {
863 ref id,
864 ref tool_name,
865 ref arguments,
866 } => {
867 // SECURITY (FIND-R46-009): Strict tool name validation (MCP 2025-11-25).
868 // When enabled, reject tool names that don't conform to the spec format.
869 if state.streamable_http.strict_tool_name_validation {
870 if let Err(e) = vellaveto_types::validate_mcp_tool_name(tool_name) {
871 tracing::warn!(
872 session_id = %session_id,
873 "SECURITY: Rejecting invalid WS tool name '{}': {}",
874 tool_name,
875 e
876 );
877 let error =
878 make_ws_error_response(Some(id), -32602, "Invalid tool name");
879 let mut sink = client_sink.lock().await;
880 let _ = sink.send(Message::Text(error.into())).await;
881 continue;
882 }
883 }
884
885 let mut action = extractor::extract_action(tool_name, arguments);
886 let mut matched_approval_id: Option<String> = None;
887 let mut matched_approval_registry: Option<&'static str> = None;
888
889 // SECURITY (FIND-R75-002): DNS resolution for IP-based policy evaluation.
890 // Parity with HTTP handler (handlers.rs:717). Without this, policies
891 // using ip_rules are completely bypassed on the WebSocket transport.
892 if state.engine.has_ip_rules() {
893 super::helpers::resolve_domains(&mut action).await;
894 }
895
896 // SECURITY (FIND-R46-006): Call chain validation and privilege escalation check.
897 // Extract X-Upstream-Agents from the initial WS upgrade headers stored in session.
898 // For WebSocket, we sync the call chain once during upgrade and reuse it.
899 let upstream_chain = {
900 let session_ref = state.sessions.get_mut(&session_id);
901 session_ref
902 .map(|s| s.current_call_chain.clone())
903 .unwrap_or_default()
904 };
905 let current_agent_id = {
906 let session_ref = state.sessions.get_mut(&session_id);
907 session_ref.and_then(|s| s.oauth_subject.clone())
908 };
909
910 // SECURITY (FIND-R46-006): Privilege escalation detection.
911 if !upstream_chain.is_empty() {
912 let priv_check = check_privilege_escalation(
913 &state.engine,
914 &state.policies,
915 &action,
916 &upstream_chain,
917 current_agent_id.as_deref(),
918 );
919 if priv_check.escalation_detected {
920 let verdict = Verdict::Deny {
921 reason: format!(
922 "Privilege escalation: agent '{}' would be denied",
923 priv_check
924 .escalating_from_agent
925 .as_deref()
926 .unwrap_or("unknown")
927 ),
928 };
929 let privilege_escalation_security_context =
930 super::helpers::privilege_escalation_security_context(&action);
931 let envelope = build_secondary_acis_envelope_with_security_context(
932 &action,
933 &verdict,
934 DecisionOrigin::PolicyEngine,
935 "websocket",
936 Some(&session_id),
937 Some(&privilege_escalation_security_context),
938 );
939 if let Err(e) = state
940 .audit
941 .log_entry_with_acis(
942 &action,
943 &verdict,
944 json!({
945 "source": "ws_proxy",
946 "session": session_id,
947 "transport": "websocket",
948 "event": "privilege_escalation_blocked",
949 "escalating_from_agent": priv_check.escalating_from_agent,
950 "upstream_deny_reason": priv_check.upstream_deny_reason,
951 }),
952 envelope,
953 )
954 .await
955 {
956 tracing::warn!(
957 "Failed to audit WS privilege escalation: {}",
958 e
959 );
960 }
961 let error =
962 make_ws_error_response(Some(id), -32001, "Denied by policy");
963 let mut sink = client_sink.lock().await;
964 let _ = sink.send(Message::Text(error.into())).await;
965 continue;
966 }
967 }
968
969 // SECURITY (FIND-R46-007): Rug-pull detection.
970 // Block calls to tools whose annotations changed since initial tools/list.
971 // SECURITY (R240-PROXY-1): Fall back to global registry on session miss.
972 let is_flagged = state
973 .sessions
974 .get_mut(&session_id)
975 .map(|s| s.flagged_tools.contains(tool_name))
976 .unwrap_or_else(|| state.sessions.is_tool_globally_flagged(tool_name));
977 if is_flagged {
978 let verdict = Verdict::Deny {
979 reason: format!(
980 "Tool '{tool_name}' blocked: annotations changed (rug-pull detected)"
981 ),
982 };
983 let rug_pull_security_context =
984 super::helpers::rug_pull_security_context(&action);
985 let envelope = build_secondary_acis_envelope_with_security_context(
986 &action,
987 &verdict,
988 DecisionOrigin::CapabilityEnforcement,
989 "websocket",
990 Some(&session_id),
991 Some(&rug_pull_security_context),
992 );
993 if let Err(e) = state
994 .audit
995 .log_entry_with_acis(
996 &action,
997 &verdict,
998 json!({
999 "source": "ws_proxy",
1000 "session": session_id,
1001 "transport": "websocket",
1002 "event": "rug_pull_tool_blocked",
1003 "tool": tool_name,
1004 }),
1005 envelope,
1006 )
1007 .await
1008 {
1009 tracing::warn!("Failed to audit WS rug-pull block: {}", e);
1010 }
1011 let error =
1012 make_ws_error_response(Some(id), -32001, "Denied by policy");
1013 let mut sink = client_sink.lock().await;
1014 let _ = sink.send(Message::Text(error.into())).await;
1015 continue;
1016 }
1017
1018 // SECURITY (FIND-R52-WS-001): DLP scan parameters for secret exfiltration.
1019 // Matches HTTP handler's DLP check to maintain security parity.
1020 {
1021 let dlp_findings = scan_parameters_for_secrets(arguments);
1022 // SECURITY (FIND-R55-WS-001): DLP on request params always blocks,
1023 // matching HTTP handler. Previously gated on injection_blocking flag.
1024 if !dlp_findings.is_empty() {
1025 for finding in &dlp_findings {
1026 record_dlp_finding(&finding.pattern_name);
1027 }
1028 let patterns: Vec<String> = dlp_findings
1029 .iter()
1030 .map(|f| format!("{} at {}", f.pattern_name, f.location))
1031 .collect();
1032 let audit_reason = format!(
1033 "DLP: secrets detected in tool parameters: {patterns:?}"
1034 );
1035 tracing::warn!(
1036 "SECURITY: DLP blocking WS tool '{}' in session {}: {}",
1037 tool_name,
1038 session_id,
1039 audit_reason
1040 );
1041 let dlp_action = extractor::extract_action(tool_name, arguments);
1042 let dlp_verdict = Verdict::Deny {
1043 reason: audit_reason,
1044 };
1045 let parameter_security_context =
1046 super::helpers::parameter_dlp_security_context(
1047 arguments,
1048 true,
1049 "tool_parameter_dlp",
1050 );
1051 let envelope = build_secondary_acis_envelope_with_security_context(
1052 &dlp_action,
1053 &dlp_verdict,
1054 DecisionOrigin::Dlp,
1055 "websocket",
1056 Some(&session_id),
1057 Some(¶meter_security_context),
1058 );
1059 if let Err(e) = state
1060 .audit
1061 .log_entry_with_acis(
1062 &dlp_action,
1063 &dlp_verdict,
1064 json!({
1065 "source": "ws_proxy",
1066 "session": session_id,
1067 "transport": "websocket",
1068 "event": "dlp_secret_blocked",
1069 "tool": tool_name,
1070 "findings": patterns,
1071 }),
1072 envelope,
1073 )
1074 .await
1075 {
1076 tracing::warn!("Failed to audit WS DLP finding: {}", e);
1077 }
1078 let error = make_ws_error_response(
1079 Some(id),
1080 -32001,
1081 "Request blocked: security policy violation",
1082 );
1083 let mut sink = client_sink.lock().await;
1084 let _ = sink.send(Message::Text(error.into())).await;
1085 continue;
1086 }
1087 }
1088
1089 // SECURITY (FIND-R52-WS-002): Memory poisoning detection.
1090 // Check if tool call parameters contain replayed response data,
1091 // matching the HTTP handler's memory poisoning check.
1092 {
1093 let poisoning_detected = state
1094 .sessions
1095 .get_mut(&session_id)
1096 .and_then(|session| {
1097 let matches =
1098 session.memory_tracker.check_parameters(arguments);
1099 if !matches.is_empty() {
1100 for m in &matches {
1101 tracing::warn!(
1102 "SECURITY: Memory poisoning detected in WS tool '{}' (session {}): \
1103 param '{}' contains replayed data (fingerprint: {})",
1104 tool_name,
1105 session_id,
1106 m.param_location,
1107 m.fingerprint
1108 );
1109 }
1110 Some(matches.len())
1111 } else {
1112 None
1113 }
1114 });
1115 if let Some(match_count) = poisoning_detected {
1116 let poison_action = extractor::extract_action(tool_name, arguments);
1117 let deny_reason = format!(
1118 "Memory poisoning detected: {match_count} replayed data fragment(s) in tool '{tool_name}'"
1119 );
1120 let poison_verdict = Verdict::Deny {
1121 reason: deny_reason,
1122 };
1123 let poisoning_security_context =
1124 super::helpers::memory_poisoning_security_context(
1125 arguments,
1126 "memory_poisoning",
1127 );
1128 let envelope = build_secondary_acis_envelope_with_security_context(
1129 &poison_action,
1130 &poison_verdict,
1131 DecisionOrigin::MemoryPoisoning,
1132 "websocket",
1133 Some(&session_id),
1134 Some(&poisoning_security_context),
1135 );
1136 if let Err(e) = state
1137 .audit
1138 .log_entry_with_acis(
1139 &poison_action,
1140 &poison_verdict,
1141 json!({
1142 "source": "ws_proxy",
1143 "session": session_id,
1144 "transport": "websocket",
1145 "event": "memory_poisoning_detected",
1146 "matches": match_count,
1147 "tool": tool_name,
1148 }),
1149 envelope,
1150 )
1151 .await
1152 {
1153 tracing::warn!("Failed to audit WS memory poisoning: {}", e);
1154 }
1155 let error = make_ws_error_response(
1156 Some(id),
1157 -32001,
1158 "Request blocked: security policy violation",
1159 );
1160 let mut sink = client_sink.lock().await;
1161 let _ = sink.send(Message::Text(error.into())).await;
1162 continue;
1163 }
1164 }
1165
1166 // SECURITY (FIND-R46-008): Circuit breaker check.
1167 // If the circuit is open for this tool, reject immediately.
1168 if let Some(ref circuit_breaker) = state.circuit_breaker {
1169 if let Err(reason) = circuit_breaker.can_proceed(tool_name) {
1170 tracing::warn!(
1171 session_id = %session_id,
1172 "SECURITY: WS circuit breaker open for tool '{}': {}",
1173 tool_name,
1174 reason
1175 );
1176 let verdict = Verdict::Deny {
1177 reason: format!("Circuit breaker open: {reason}"),
1178 };
1179 // SECURITY (R251-ACIS-1): Use CircuitBreaker origin, not RateLimiter.
1180 let circuit_breaker_security_context =
1181 super::helpers::circuit_breaker_security_context(&action);
1182 let envelope = build_secondary_acis_envelope_with_security_context(
1183 &action,
1184 &verdict,
1185 DecisionOrigin::CircuitBreaker,
1186 "websocket",
1187 Some(&session_id),
1188 Some(&circuit_breaker_security_context),
1189 );
1190 if let Err(e) = state
1191 .audit
1192 .log_entry_with_acis(
1193 &action,
1194 &verdict,
1195 json!({
1196 "source": "ws_proxy",
1197 "session": session_id,
1198 "transport": "websocket",
1199 "event": "circuit_breaker_rejected",
1200 "tool": tool_name,
1201 }),
1202 envelope,
1203 )
1204 .await
1205 {
1206 tracing::warn!(
1207 "Failed to audit WS circuit breaker rejection: {}",
1208 e
1209 );
1210 }
1211 let error = make_ws_error_response(
1212 Some(id),
1213 -32001,
1214 "Service temporarily unavailable",
1215 );
1216 let mut sink = client_sink.lock().await;
1217 let _ = sink.send(Message::Text(error.into())).await;
1218 continue;
1219 }
1220 }
1221
1222 // SECURITY (FIND-R46-013): Tool registry trust check.
1223 // If tool registry is configured, check trust level before evaluation.
1224 if let Some(ref registry) = state.tool_registry {
1225 let registry_eval_ctx = state
1226 .sessions
1227 .get(&session_id)
1228 .map(|session| EvaluationContext {
1229 timestamp: None,
1230 agent_id: session.oauth_subject.clone(),
1231 agent_identity: session.agent_identity.clone(),
1232 call_counts: session.call_counts.clone(),
1233 previous_actions: session
1234 .action_history
1235 .iter()
1236 .cloned()
1237 .collect(),
1238 call_chain: session.current_call_chain.clone(),
1239 tenant_id: None,
1240 verification_tier: None,
1241 capability_token: None,
1242 session_state: None,
1243 })
1244 .unwrap_or_default();
1245 let registry_runtime_security_context =
1246 build_ws_runtime_security_context(
1247 &parsed,
1248 &action,
1249 &handshake_headers,
1250 super::helpers::TransportSecurityInputs {
1251 oauth_evidence: oauth_claims.as_ref(),
1252 eval_ctx: Some(®istry_eval_ctx),
1253 sessions: &state.sessions,
1254 session_id: Some(&session_id),
1255 trusted_request_signers: &state.trusted_request_signers,
1256 detached_signature_freshness: state
1257 .detached_signature_freshness,
1258 },
1259 );
1260 let trust = registry.check_trust_level(tool_name).await;
1261 match trust {
1262 vellaveto_mcp::tool_registry::TrustLevel::Unknown => {
1263 match super::helpers::presented_approval_matches_action(
1264 &state,
1265 &session_id,
1266 presented_approval_id.as_deref(),
1267 &action,
1268 )
1269 .await
1270 {
1271 Ok(Some(approval_id)) => {
1272 matched_approval_id = Some(approval_id);
1273 matched_approval_registry = Some("unknown_tool");
1274 }
1275 Ok(None) => {
1276 registry.register_unknown(tool_name).await;
1277 let verdict = Verdict::Deny {
1278 reason: "Unknown tool requires approval"
1279 .to_string(),
1280 };
1281 let approval_security_context =
1282 super::helpers::unknown_tool_approval_gate_security_context(&action);
1283 let combined_security_context =
1284 super::helpers::merge_transport_security_context(
1285 registry_runtime_security_context.as_ref(),
1286 Some(&approval_security_context),
1287 );
1288 let effective_security_context =
1289 combined_security_context
1290 .as_ref()
1291 .unwrap_or(&approval_security_context);
1292 let envelope =
1293 build_secondary_acis_envelope_with_security_context(
1294 &action,
1295 &verdict,
1296 DecisionOrigin::PolicyEngine,
1297 "websocket",
1298 Some(&session_id),
1299 Some(effective_security_context),
1300 );
1301 if let Err(e) = state
1302 .audit
1303 .log_entry_with_acis(
1304 &action,
1305 &verdict,
1306 json!({
1307 "source": "ws_proxy",
1308 "session": session_id,
1309 "transport": "websocket",
1310 "registry": "unknown_tool",
1311 "tool": tool_name,
1312 }),
1313 envelope,
1314 )
1315 .await
1316 {
1317 tracing::warn!(
1318 "Failed to audit WS unknown tool: {}",
1319 e
1320 );
1321 }
1322 let approval_reason = "Approval required";
1323 let containment_context =
1324 super::helpers::approval_containment_context_from_security_context(
1325 effective_security_context,
1326 approval_reason,
1327 );
1328 let approval_id =
1329 super::helpers::create_pending_approval_with_context(
1330 &state,
1331 &session_id,
1332 &action,
1333 approval_reason,
1334 containment_context,
1335 )
1336 .await;
1337 let error = make_ws_error_response_with_data(
1338 Some(id),
1339 -32001,
1340 approval_reason,
1341 Some(json!({
1342 "verdict": "require_approval",
1343 "reason": approval_reason,
1344 "approval_id": approval_id,
1345 })),
1346 );
1347 let mut sink = client_sink.lock().await;
1348 let _ = sink.send(Message::Text(error.into())).await;
1349 continue;
1350 }
1351 Err(()) => {
1352 let verdict = Verdict::Deny {
1353 reason: INVALID_PRESENTED_APPROVAL_REASON
1354 .to_string(),
1355 };
1356 let invalid_approval_security_context =
1357 super::helpers::invalid_presented_approval_security_context(&action);
1358 let combined_security_context =
1359 super::helpers::merge_transport_security_context(
1360 registry_runtime_security_context.as_ref(),
1361 Some(&invalid_approval_security_context),
1362 );
1363 let effective_security_context =
1364 combined_security_context
1365 .as_ref()
1366 .unwrap_or(&invalid_approval_security_context);
1367 let envelope =
1368 build_secondary_acis_envelope_with_security_context(
1369 &action,
1370 &verdict,
1371 DecisionOrigin::PolicyEngine,
1372 "websocket",
1373 Some(&session_id),
1374 Some(effective_security_context),
1375 );
1376 let _ = state
1377 .audit
1378 .log_entry_with_acis(
1379 &action,
1380 &verdict,
1381 json!({
1382 "source": "ws_proxy",
1383 "session": session_id,
1384 "transport": "websocket",
1385 "registry": "unknown_tool",
1386 "event": "presented_approval_replay_denied",
1387 "approval_id": presented_approval_id,
1388 }),
1389 envelope,
1390 )
1391 .await;
1392 let error = make_ws_error_response(
1393 Some(id),
1394 -32001,
1395 "Denied by policy",
1396 );
1397 let mut sink = client_sink.lock().await;
1398 let _ = sink.send(Message::Text(error.into())).await;
1399 continue;
1400 }
1401 }
1402 }
1403 vellaveto_mcp::tool_registry::TrustLevel::Untrusted {
1404 score: _,
1405 } => {
1406 match super::helpers::presented_approval_matches_action(
1407 &state,
1408 &session_id,
1409 presented_approval_id.as_deref(),
1410 &action,
1411 )
1412 .await
1413 {
1414 Ok(Some(approval_id)) => {
1415 matched_approval_id = Some(approval_id);
1416 matched_approval_registry = Some("untrusted_tool");
1417 }
1418 Ok(None) => {
1419 let verdict = Verdict::Deny {
1420 reason: "Untrusted tool requires approval"
1421 .to_string(),
1422 };
1423 let approval_security_context =
1424 super::helpers::untrusted_tool_approval_gate_security_context(&action);
1425 let combined_security_context =
1426 super::helpers::merge_transport_security_context(
1427 registry_runtime_security_context.as_ref(),
1428 Some(&approval_security_context),
1429 );
1430 let effective_security_context =
1431 combined_security_context
1432 .as_ref()
1433 .unwrap_or(&approval_security_context);
1434 let envelope =
1435 build_secondary_acis_envelope_with_security_context(
1436 &action,
1437 &verdict,
1438 DecisionOrigin::PolicyEngine,
1439 "websocket",
1440 Some(&session_id),
1441 Some(effective_security_context),
1442 );
1443 if let Err(e) = state
1444 .audit
1445 .log_entry_with_acis(
1446 &action,
1447 &verdict,
1448 json!({
1449 "source": "ws_proxy",
1450 "session": session_id,
1451 "transport": "websocket",
1452 "registry": "untrusted_tool",
1453 "tool": tool_name,
1454 }),
1455 envelope,
1456 )
1457 .await
1458 {
1459 tracing::warn!(
1460 "Failed to audit WS untrusted tool: {}",
1461 e
1462 );
1463 }
1464 let approval_reason = "Approval required";
1465 let containment_context =
1466 super::helpers::approval_containment_context_from_security_context(
1467 effective_security_context,
1468 approval_reason,
1469 );
1470 let approval_id =
1471 super::helpers::create_pending_approval_with_context(
1472 &state,
1473 &session_id,
1474 &action,
1475 approval_reason,
1476 containment_context,
1477 )
1478 .await;
1479 let error = make_ws_error_response_with_data(
1480 Some(id),
1481 -32001,
1482 approval_reason,
1483 Some(json!({
1484 "verdict": "require_approval",
1485 "reason": approval_reason,
1486 "approval_id": approval_id,
1487 })),
1488 );
1489 let mut sink = client_sink.lock().await;
1490 let _ = sink.send(Message::Text(error.into())).await;
1491 continue;
1492 }
1493 Err(()) => {
1494 let verdict = Verdict::Deny {
1495 reason: INVALID_PRESENTED_APPROVAL_REASON
1496 .to_string(),
1497 };
1498 let invalid_approval_security_context =
1499 super::helpers::invalid_presented_approval_security_context(&action);
1500 let combined_security_context =
1501 super::helpers::merge_transport_security_context(
1502 registry_runtime_security_context.as_ref(),
1503 Some(&invalid_approval_security_context),
1504 );
1505 let effective_security_context =
1506 combined_security_context
1507 .as_ref()
1508 .unwrap_or(&invalid_approval_security_context);
1509 let envelope =
1510 build_secondary_acis_envelope_with_security_context(
1511 &action,
1512 &verdict,
1513 DecisionOrigin::PolicyEngine,
1514 "websocket",
1515 Some(&session_id),
1516 Some(effective_security_context),
1517 );
1518 let _ = state
1519 .audit
1520 .log_entry_with_acis(
1521 &action,
1522 &verdict,
1523 json!({
1524 "source": "ws_proxy",
1525 "session": session_id,
1526 "transport": "websocket",
1527 "registry": "untrusted_tool",
1528 "event": "presented_approval_replay_denied",
1529 "approval_id": presented_approval_id,
1530 }),
1531 envelope,
1532 )
1533 .await;
1534 let error = make_ws_error_response(
1535 Some(id),
1536 -32001,
1537 "Denied by policy",
1538 );
1539 let mut sink = client_sink.lock().await;
1540 let _ = sink.send(Message::Text(error.into())).await;
1541 continue;
1542 }
1543 }
1544 }
1545 vellaveto_mcp::tool_registry::TrustLevel::Trusted => {
1546 // Trusted — proceed to engine evaluation
1547 }
1548 }
1549 }
1550
1551 // SECURITY (FIND-R130-002): Combine context read, evaluation,
1552 // and session update into a single block holding the DashMap
1553 // shard lock. Without this, concurrent WS connections sharing
1554 // a session can bypass max_calls_in_window by racing: both
1555 // clone the same stale call_counts, both pass evaluation, both
1556 // increment. Matches HTTP handler R19-TOCTOU pattern
1557 // (handlers.rs:725-789).
1558
1559 // Pre-compute security context BEFORE acquiring DashMap write lock
1560 // to avoid deadlock — build_ws_runtime_security_context internally
1561 // calls sessions.get()/get_mut() which would re-enter the lock.
1562 let pre_eval_ctx = state
1563 .sessions
1564 .get(&session_id)
1565 .map(|session| EvaluationContext {
1566 timestamp: None,
1567 agent_id: session.oauth_subject.clone(),
1568 agent_identity: session.agent_identity.clone(),
1569 call_chain: session.current_call_chain.clone(),
1570 ..EvaluationContext::default()
1571 })
1572 .unwrap_or_else(|| EvaluationContext {
1573 agent_id: oauth_claims.as_ref().map(|claims| claims.sub.clone()),
1574 ..EvaluationContext::default()
1575 });
1576 let security_context = build_ws_runtime_security_context(
1577 &parsed,
1578 &action,
1579 &handshake_headers,
1580 super::helpers::TransportSecurityInputs {
1581 oauth_evidence: oauth_claims.as_ref(),
1582 eval_ctx: Some(&pre_eval_ctx),
1583 sessions: &state.sessions,
1584 session_id: Some(&session_id),
1585 trusted_request_signers: &state.trusted_request_signers,
1586 detached_signature_freshness: state.detached_signature_freshness,
1587 },
1588 );
1589
1590 // TOCTOU-safe evaluation + session update (write lock)
1591 let (mediation_result, ctx) = if let Some(mut session) =
1592 state.sessions.get_mut(&session_id)
1593 {
1594 let ctx = EvaluationContext {
1595 timestamp: None,
1596 agent_id: session.oauth_subject.clone(),
1597 agent_identity: session.agent_identity.clone(),
1598 call_counts: session.call_counts.clone(),
1599 previous_actions: session.action_history.iter().cloned().collect(),
1600 call_chain: session.current_call_chain.clone(),
1601 tenant_id: None,
1602 verification_tier: None,
1603 capability_token: None,
1604 session_state: None,
1605 };
1606 let result = mediate_with_security_context(
1607 &uuid::Uuid::new_v4().to_string().replace('-', ""),
1608 &action,
1609 &state.engine,
1610 Some(&ctx),
1611 security_context.as_ref(),
1612 "websocket",
1613 &state.mediation_config,
1614 Some(&session_id),
1615 None,
1616 );
1617
1618 // Atomically update session on Allow while still holding
1619 // the shard lock — prevents TOCTOU bypass of call limits.
1620 if matches!(result.verdict, Verdict::Allow) {
1621 session.touch();
1622 use crate::proxy::call_chain::{
1623 MAX_ACTION_HISTORY, MAX_CALL_COUNT_TOOLS,
1624 };
1625 if session.call_counts.len() < MAX_CALL_COUNT_TOOLS
1626 || session.call_counts.contains_key(tool_name)
1627 {
1628 let count = session
1629 .call_counts
1630 .entry(tool_name.to_string())
1631 .or_insert(0);
1632 *count = count.saturating_add(1);
1633 }
1634 if session.action_history.len() >= MAX_ACTION_HISTORY {
1635 session.action_history.pop_front();
1636 }
1637 session.action_history.push_back(tool_name.to_string());
1638 }
1639
1640 (result, ctx)
1641 } else {
1642 // No session — evaluate without context (fail-closed)
1643 let ctx = EvaluationContext::default();
1644 let result = mediate_with_security_context(
1645 &uuid::Uuid::new_v4().to_string().replace('-', ""),
1646 &action,
1647 &state.engine,
1648 None,
1649 security_context.as_ref(),
1650 "websocket",
1651 &state.mediation_config,
1652 Some(&session_id),
1653 None,
1654 );
1655 (result, ctx)
1656 };
1657
1658 let mut final_origin = mediation_result.origin;
1659 let mut acis_envelope = mediation_result.envelope.clone();
1660 let mut refresh_envelope = false;
1661 let verdict = match mediation_result.verdict {
1662 Verdict::RequireApproval { reason } => {
1663 if matched_approval_id.is_some() {
1664 final_origin = DecisionOrigin::PolicyEngine;
1665 refresh_envelope = true;
1666 Verdict::Allow
1667 } else {
1668 match super::helpers::presented_approval_matches_action(
1669 &state,
1670 &session_id,
1671 presented_approval_id.as_deref(),
1672 &action,
1673 )
1674 .await
1675 {
1676 Ok(Some(approval_id)) => {
1677 matched_approval_id = Some(approval_id);
1678 final_origin = DecisionOrigin::PolicyEngine;
1679 refresh_envelope = true;
1680 Verdict::Allow
1681 }
1682 Ok(None) => Verdict::RequireApproval { reason },
1683 Err(()) => Verdict::Deny {
1684 reason: {
1685 final_origin = DecisionOrigin::ApprovalGate;
1686 refresh_envelope = true;
1687 INVALID_PRESENTED_APPROVAL_REASON.to_string()
1688 },
1689 },
1690 }
1691 }
1692 }
1693 other => other,
1694 };
1695 if refresh_envelope {
1696 acis_envelope = refresh_ws_acis_envelope(
1697 &acis_envelope,
1698 &action,
1699 &verdict,
1700 final_origin,
1701 &session_id,
1702 &ctx,
1703 security_context.as_ref(),
1704 );
1705 }
1706
1707 match verdict {
1708 Verdict::Allow => {
1709 // Phase 21: ABAC refinement — only runs when ABAC engine is configured
1710 if let Some(ref abac) = state.abac_engine {
1711 let principal_id =
1712 ctx.agent_id.as_deref().unwrap_or("anonymous");
1713 let principal_type = ctx.principal_type();
1714 let session_risk = state
1715 .sessions
1716 .get_mut(&session_id)
1717 .and_then(|s| s.risk_score.clone());
1718 let abac_ctx = vellaveto_engine::abac::AbacEvalContext {
1719 eval_ctx: &ctx,
1720 principal_type,
1721 principal_id,
1722 risk_score: session_risk.as_ref(),
1723 };
1724 match abac.evaluate(&action, &abac_ctx) {
1725 vellaveto_engine::abac::AbacDecision::Deny {
1726 policy_id,
1727 reason,
1728 } => {
1729 let deny_verdict = Verdict::Deny {
1730 reason: reason.clone(),
1731 };
1732 let abac_security_context =
1733 super::helpers::abac_deny_security_context(&action);
1734 let envelope =
1735 build_secondary_acis_envelope_with_security_context(
1736 &action,
1737 &deny_verdict,
1738 DecisionOrigin::PolicyEngine,
1739 "websocket",
1740 Some(&session_id),
1741 Some(&abac_security_context),
1742 );
1743 if let Err(e) = state
1744 .audit
1745 .log_entry_with_acis(
1746 &action,
1747 &deny_verdict,
1748 json!({
1749 "source": "ws_proxy",
1750 "session": session_id,
1751 "transport": "websocket",
1752 "event": "abac_deny",
1753 "abac_policy": policy_id,
1754 }),
1755 envelope,
1756 )
1757 .await
1758 {
1759 tracing::warn!(
1760 "Failed to audit WS ABAC deny: {}",
1761 e
1762 );
1763 }
1764 // SECURITY (FIND-R46-012): Generic message to client;
1765 // detailed reason (ABAC policy_id, reason) is in
1766 // the audit log only.
1767 let error_resp = make_ws_error_response(
1768 Some(id),
1769 -32001,
1770 "Denied by policy",
1771 );
1772 let mut sink = client_sink.lock().await;
1773 let _ =
1774 sink.send(Message::Text(error_resp.into())).await;
1775 continue;
1776 }
1777 vellaveto_engine::abac::AbacDecision::Allow {
1778 policy_id,
1779 } => {
1780 if let Some(ref la) = state.least_agency {
1781 la.record_usage(
1782 principal_id,
1783 &session_id,
1784 &policy_id,
1785 tool_name,
1786 &action.function,
1787 );
1788 }
1789 }
1790 vellaveto_engine::abac::AbacDecision::NoMatch => {
1791 // Fall through — existing Allow stands
1792 }
1793 #[allow(unreachable_patterns)]
1794 // AbacDecision is #[non_exhaustive]
1795 _ => {
1796 // SECURITY (FIND-R74-002): Future variants — fail-closed (deny).
1797 // Must send deny and continue, not fall through to Allow path.
1798 tracing::warn!(
1799 "Unknown AbacDecision variant — fail-closed"
1800 );
1801 let error_resp = make_ws_error_response(
1802 Some(id),
1803 -32001,
1804 "Denied by policy",
1805 );
1806 let mut sink = client_sink.lock().await;
1807 let _ =
1808 sink.send(Message::Text(error_resp.into())).await;
1809 continue;
1810 }
1811 }
1812 }
1813
1814 // SECURITY (FIND-R46-013): Record tool call in registry on Allow
1815 if let Some(ref registry) = state.tool_registry {
1816 registry.record_call(tool_name).await;
1817 }
1818
1819 // NOTE: Session touch + call_counts/action_history
1820 // update already performed inside the TOCTOU-safe
1821 // block above (FIND-R130-002). No separate update here.
1822
1823 if super::helpers::consume_presented_approval(
1824 &state,
1825 &session_id,
1826 matched_approval_id.as_deref(),
1827 &action,
1828 security_context.as_ref(),
1829 )
1830 .await
1831 .is_err()
1832 {
1833 let deny_verdict = Verdict::Deny {
1834 reason: INVALID_PRESENTED_APPROVAL_REASON.to_string(),
1835 };
1836 let invalid_approval_security_context =
1837 super::helpers::invalid_presented_approval_security_context(
1838 &action,
1839 );
1840 let combined_security_context =
1841 super::helpers::merge_transport_security_context(
1842 security_context.as_ref(),
1843 Some(&invalid_approval_security_context),
1844 );
1845 let effective_security_context = combined_security_context
1846 .as_ref()
1847 .unwrap_or(&invalid_approval_security_context);
1848 let envelope =
1849 build_secondary_acis_envelope_with_security_context(
1850 &action,
1851 &deny_verdict,
1852 DecisionOrigin::ApprovalGate,
1853 "websocket",
1854 Some(&session_id),
1855 Some(effective_security_context),
1856 );
1857 let mut audit_metadata = json!({
1858 "source": "ws_proxy",
1859 "session": session_id,
1860 "transport": "websocket",
1861 "event": "presented_approval_replay_denied",
1862 "approval_id": matched_approval_id,
1863 });
1864 if let Some(registry) = matched_approval_registry {
1865 audit_metadata["registry"] = json!(registry);
1866 }
1867 let _ = state
1868 .audit
1869 .log_entry_with_acis(
1870 &action,
1871 &deny_verdict,
1872 audit_metadata,
1873 envelope,
1874 )
1875 .await;
1876 let error_resp = make_ws_error_response(
1877 Some(id),
1878 -32001,
1879 "Denied by policy",
1880 );
1881 let mut sink = client_sink.lock().await;
1882 let _ = sink.send(Message::Text(error_resp.into())).await;
1883 continue;
1884 }
1885
1886 if let Err(e) = state
1887 .audit
1888 .log_entry_with_acis(
1889 &action,
1890 &Verdict::Allow,
1891 json!({
1892 "source": "ws_proxy",
1893 "session": session_id,
1894 "transport": "websocket",
1895 }),
1896 acis_envelope,
1897 )
1898 .await
1899 {
1900 tracing::error!(
1901 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
1902 e
1903 );
1904 // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
1905 // No unaudited security decisions can occur.
1906 if state.audit_strict_mode {
1907 let error = make_ws_error_response(
1908 Some(id),
1909 -32000,
1910 "Audit logging failed — request denied (strict audit mode)",
1911 );
1912 let mut sink = client_sink.lock().await;
1913 let _ = sink.send(Message::Text(error.into())).await;
1914 continue;
1915 }
1916 }
1917
1918 // Canonicalize and forward
1919 let forward_text = if state.canonicalize {
1920 match serde_json::to_string(&parsed) {
1921 Ok(canonical) => canonical,
1922 Err(e) => {
1923 tracing::error!(
1924 "SECURITY: WS canonicalization failed: {}",
1925 e
1926 );
1927 let error_resp = make_ws_error_response(
1928 Some(id),
1929 -32603,
1930 "Internal error",
1931 );
1932 let mut sink = client_sink.lock().await;
1933 let _ =
1934 sink.send(Message::Text(error_resp.into())).await;
1935 continue;
1936 }
1937 }
1938 } else {
1939 text.to_string()
1940 };
1941
1942 // Track request→response mapping for output-schema
1943 // enforcement when upstream omits result._meta.tool.
1944 track_pending_tool_call(
1945 &state.sessions,
1946 &session_id,
1947 id,
1948 tool_name,
1949 );
1950
1951 let mut sink = upstream_sink.lock().await;
1952 if let Err(e) = sink
1953 .send(tokio_tungstenite::tungstenite::Message::Text(
1954 forward_text.into(),
1955 ))
1956 .await
1957 {
1958 tracing::error!(
1959 session_id = %session_id,
1960 "Failed to forward to upstream: {}",
1961 e
1962 );
1963 break;
1964 }
1965 }
1966 Verdict::Deny { ref reason } => {
1967 // Audit the denial with detailed reason
1968 let mut extra = json!({
1969 "source": "ws_proxy",
1970 "session": session_id,
1971 "transport": "websocket",
1972 "tool": tool_name,
1973 });
1974 if reason == INVALID_PRESENTED_APPROVAL_REASON
1975 && presented_approval_id.is_some()
1976 {
1977 extra["event"] = json!("presented_approval_replay_denied");
1978 extra["approval_id"] = json!(presented_approval_id);
1979 }
1980 if let Err(e) = state
1981 .audit
1982 .log_entry_with_acis(&action, &verdict, extra, acis_envelope)
1983 .await
1984 {
1985 tracing::error!(
1986 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
1987 e
1988 );
1989 // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
1990 if state.audit_strict_mode {
1991 let error = make_ws_error_response(
1992 Some(id),
1993 -32000,
1994 "Audit logging failed — request denied (strict audit mode)",
1995 );
1996 let mut sink = client_sink.lock().await;
1997 let _ = sink.send(Message::Text(error.into())).await;
1998 continue;
1999 }
2000 }
2001
2002 // SECURITY (FIND-R46-012): Generic message to client.
2003 // Detailed reason is in the audit log only.
2004 let _ = reason; // used in audit above
2005 let error =
2006 make_ws_error_response(Some(id), -32001, "Denied by policy");
2007 let mut sink = client_sink.lock().await;
2008 let _ = sink.send(Message::Text(error.into())).await;
2009 }
2010 Verdict::RequireApproval { ref reason, .. } => {
2011 let approval_verdict = Verdict::RequireApproval {
2012 reason: reason.clone(),
2013 };
2014 let containment_context =
2015 super::helpers::approval_containment_context_from_envelope(
2016 &acis_envelope,
2017 reason,
2018 );
2019 if let Err(e) = state
2020 .audit
2021 .log_entry_with_acis(
2022 &action,
2023 &approval_verdict,
2024 json!({
2025 "source": "ws_proxy",
2026 "session": session_id,
2027 "transport": "websocket",
2028 }),
2029 acis_envelope,
2030 )
2031 .await
2032 {
2033 tracing::error!(
2034 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
2035 e
2036 );
2037 // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
2038 if state.audit_strict_mode {
2039 let error = make_ws_error_response(
2040 Some(id),
2041 -32000,
2042 "Audit logging failed — request denied (strict audit mode)",
2043 );
2044 let mut sink = client_sink.lock().await;
2045 let _ = sink.send(Message::Text(error.into())).await;
2046 continue;
2047 }
2048 }
2049 let approval_reason = "Approval required";
2050 let approval_id =
2051 super::helpers::create_pending_approval_with_context(
2052 &state,
2053 &session_id,
2054 &action,
2055 reason,
2056 containment_context,
2057 )
2058 .await;
2059 let error = make_ws_error_response_with_data(
2060 Some(id),
2061 -32001,
2062 approval_reason,
2063 Some(json!({
2064 "verdict": "require_approval",
2065 "reason": approval_reason,
2066 "approval_id": approval_id,
2067 })),
2068 );
2069 let mut sink = client_sink.lock().await;
2070 let _ = sink.send(Message::Text(error.into())).await;
2071 }
2072 // Fail-closed: unknown Verdict variants produce Deny
2073 _ => {
2074 let error =
2075 make_ws_error_response(Some(id), -32001, "Denied by policy");
2076 let mut sink = client_sink.lock().await;
2077 let _ = sink.send(Message::Text(error.into())).await;
2078 }
2079 }
2080 }
2081 MessageType::ResourceRead { ref id, ref uri } => {
2082 // SECURITY (FIND-R74-007): Check for memory poisoning in resource URI.
2083 // ResourceRead is a likely exfiltration vector: a poisoned tool response
2084 // says "read this file" and the agent issues resources/read for that URI.
2085 // Parity with HTTP handler (handlers.rs:1472).
2086 {
2087 let poisoning_detected = state
2088 .sessions
2089 .get_mut(&session_id)
2090 .and_then(|session| {
2091 let uri_params = json!({"uri": uri});
2092 let matches =
2093 session.memory_tracker.check_parameters(&uri_params);
2094 if !matches.is_empty() {
2095 for m in &matches {
2096 tracing::warn!(
2097 "SECURITY: Memory poisoning detected in WS resources/read (session {}): \
2098 param '{}' contains replayed data (fingerprint: {})",
2099 session_id,
2100 m.param_location,
2101 m.fingerprint
2102 );
2103 }
2104 Some(matches.len())
2105 } else {
2106 None
2107 }
2108 });
2109 if let Some(match_count) = poisoning_detected {
2110 let poison_action = extractor::extract_resource_action(uri);
2111 let deny_reason = format!(
2112 "Memory poisoning detected: {match_count} replayed data fragment(s) in resources/read"
2113 );
2114 let resource_poison_verdict = Verdict::Deny {
2115 reason: deny_reason.clone(),
2116 };
2117 let poisoning_security_context =
2118 super::helpers::memory_poisoning_security_context(
2119 &json!({ "uri": uri }),
2120 "memory_poisoning",
2121 );
2122 let envelope = build_secondary_acis_envelope_with_security_context(
2123 &poison_action,
2124 &resource_poison_verdict,
2125 DecisionOrigin::MemoryPoisoning,
2126 "websocket",
2127 Some(&session_id),
2128 Some(&poisoning_security_context),
2129 );
2130 if let Err(e) = state
2131 .audit
2132 .log_entry_with_acis(
2133 &poison_action,
2134 &resource_poison_verdict,
2135 json!({
2136 "source": "ws_proxy",
2137 "session": session_id,
2138 "transport": "websocket",
2139 "event": "memory_poisoning_detected",
2140 "matches": match_count,
2141 "uri": uri,
2142 }),
2143 envelope,
2144 )
2145 .await
2146 {
2147 tracing::warn!(
2148 "Failed to audit WS resource memory poisoning: {}",
2149 e
2150 );
2151 }
2152 let error = make_ws_error_response(
2153 Some(id),
2154 -32001,
2155 "Request blocked: security policy violation",
2156 );
2157 let mut sink = client_sink.lock().await;
2158 let _ = sink.send(Message::Text(error.into())).await;
2159 continue;
2160 }
2161 }
2162
2163 // SECURITY (FIND-R115-041): Rug-pull detection for resource URIs.
2164 // If the upstream server was flagged (annotations changed since initial
2165 // tools/list), block resource reads from that server.
2166 // Parity with HTTP handler (handlers.rs:1555).
2167 // SECURITY (R240-PROXY-1): Fall back to global registry on session miss.
2168 {
2169 let is_flagged = state
2170 .sessions
2171 .get_mut(&session_id)
2172 .map(|s| s.flagged_tools.contains(uri.as_str()))
2173 .unwrap_or_else(|| {
2174 state.sessions.is_tool_globally_flagged(uri.as_str())
2175 });
2176 if is_flagged {
2177 let action = extractor::extract_resource_action(uri);
2178 let verdict = Verdict::Deny {
2179 reason: format!(
2180 "Resource '{uri}' blocked: server flagged by rug-pull detection"
2181 ),
2182 };
2183 let rug_pull_security_context =
2184 super::helpers::rug_pull_security_context(&action);
2185 let envelope = build_secondary_acis_envelope_with_security_context(
2186 &action,
2187 &verdict,
2188 DecisionOrigin::CapabilityEnforcement,
2189 "websocket",
2190 Some(&session_id),
2191 Some(&rug_pull_security_context),
2192 );
2193 if let Err(e) = state
2194 .audit
2195 .log_entry_with_acis(
2196 &action,
2197 &verdict,
2198 json!({
2199 "source": "ws_proxy",
2200 "session": session_id,
2201 "transport": "websocket",
2202 "event": "rug_pull_resource_blocked",
2203 "uri": uri,
2204 }),
2205 envelope,
2206 )
2207 .await
2208 {
2209 tracing::warn!(
2210 "Failed to audit WS resource rug-pull block: {}",
2211 e
2212 );
2213 }
2214 let error =
2215 make_ws_error_response(Some(id), -32001, "Denied by policy");
2216 let mut sink = client_sink.lock().await;
2217 let _ = sink.send(Message::Text(error.into())).await;
2218 continue;
2219 }
2220 }
2221
2222 // Build action for resource read
2223 let mut action = extractor::extract_resource_action(uri);
2224 let mut matched_approval_id: Option<String> = None;
2225
2226 // SECURITY (FIND-R75-002): DNS resolution for resource reads.
2227 // Parity with HTTP handler (handlers.rs:1543).
2228 if state.engine.has_ip_rules() {
2229 super::helpers::resolve_domains(&mut action).await;
2230 }
2231
2232 // SECURITY (FIND-R116-004): DLP scan on resource URI.
2233 // Parity with HTTP handler (handlers.rs:1598).
2234 {
2235 let uri_params = json!({"uri": uri});
2236 let dlp_findings = scan_parameters_for_secrets(&uri_params);
2237 if !dlp_findings.is_empty() {
2238 for finding in &dlp_findings {
2239 record_dlp_finding(&finding.pattern_name);
2240 }
2241 tracing::warn!(
2242 "SECURITY: Secret detected in WS resource URI! Session: {}, URI: [redacted]",
2243 session_id,
2244 );
2245 let audit_verdict = Verdict::Deny {
2246 reason: "DLP blocked: secret detected in resource URI"
2247 .to_string(),
2248 };
2249 let parameter_security_context =
2250 super::helpers::parameter_dlp_security_context(
2251 &uri_params,
2252 true,
2253 "resource_uri_dlp",
2254 );
2255 let envelope = build_secondary_acis_envelope_with_security_context(
2256 &action,
2257 &audit_verdict,
2258 DecisionOrigin::Dlp,
2259 "websocket",
2260 Some(&session_id),
2261 Some(¶meter_security_context),
2262 );
2263 if let Err(e) = state.audit.log_entry_with_acis(
2264 &action, &audit_verdict,
2265 json!({
2266 "source": "ws_proxy", "session": session_id,
2267 "transport": "websocket", "event": "resource_uri_dlp_alert",
2268 }),
2269 envelope,
2270 ).await {
2271 tracing::warn!("Failed to audit WS resource URI DLP: {}", e);
2272 }
2273 let error = make_ws_error_response(
2274 Some(id),
2275 -32001,
2276 "Request blocked: security policy violation",
2277 );
2278 let mut sink = client_sink.lock().await;
2279 let _ = sink.send(Message::Text(error.into())).await;
2280 continue;
2281 }
2282 }
2283
2284 // SECURITY (FIND-R115-042): Circuit breaker check for resource reads.
2285 // Parity with HTTP handler (handlers.rs:1668) — prevent resource reads
2286 // from hammering a failing upstream server.
2287 if let Some(ref circuit_breaker) = state.circuit_breaker {
2288 if let Err(reason) = circuit_breaker.can_proceed(uri) {
2289 tracing::warn!(
2290 "SECURITY: WS circuit breaker open for resource '{}' in session {}: {}",
2291 uri,
2292 session_id,
2293 reason
2294 );
2295 let verdict = Verdict::Deny {
2296 reason: format!("Circuit breaker open: {reason}"),
2297 };
2298 // SECURITY (R251-ACIS-1): Use CircuitBreaker origin, not RateLimiter.
2299 let circuit_breaker_security_context =
2300 super::helpers::circuit_breaker_security_context(&action);
2301 let envelope = build_secondary_acis_envelope_with_security_context(
2302 &action,
2303 &verdict,
2304 DecisionOrigin::CircuitBreaker,
2305 "websocket",
2306 Some(&session_id),
2307 Some(&circuit_breaker_security_context),
2308 );
2309 if let Err(e) = state
2310 .audit
2311 .log_entry_with_acis(
2312 &action,
2313 &verdict,
2314 json!({
2315 "source": "ws_proxy",
2316 "session": session_id,
2317 "transport": "websocket",
2318 "event": "circuit_breaker_rejected",
2319 "uri": uri,
2320 }),
2321 envelope,
2322 )
2323 .await
2324 {
2325 tracing::warn!(
2326 "Failed to audit WS resource circuit breaker rejection: {}",
2327 e
2328 );
2329 }
2330 let error = make_ws_error_response(
2331 Some(id),
2332 -32001,
2333 "Service temporarily unavailable",
2334 );
2335 let mut sink = client_sink.lock().await;
2336 let _ = sink.send(Message::Text(error.into())).await;
2337 continue;
2338 }
2339 }
2340
2341 // SECURITY (FIND-R130-002): TOCTOU-safe context+eval+update
2342 // for resource reads. Matches ToolCall fix above and HTTP
2343 // handler FIND-R112-002 pattern (handlers.rs:1711-1774).
2344
2345 // Pre-compute security context BEFORE acquiring DashMap write lock
2346 // to avoid deadlock — build_ws_runtime_security_context internally
2347 // calls sessions.get()/get_mut() which would re-enter the lock.
2348 let pre_eval_ctx = state
2349 .sessions
2350 .get(&session_id)
2351 .map(|session| EvaluationContext {
2352 timestamp: None,
2353 agent_id: session.oauth_subject.clone(),
2354 agent_identity: session.agent_identity.clone(),
2355 call_chain: session.current_call_chain.clone(),
2356 ..EvaluationContext::default()
2357 })
2358 .unwrap_or_else(|| EvaluationContext {
2359 agent_id: oauth_claims.as_ref().map(|claims| claims.sub.clone()),
2360 ..EvaluationContext::default()
2361 });
2362 let security_context = build_ws_runtime_security_context(
2363 &parsed,
2364 &action,
2365 &handshake_headers,
2366 super::helpers::TransportSecurityInputs {
2367 oauth_evidence: oauth_claims.as_ref(),
2368 eval_ctx: Some(&pre_eval_ctx),
2369 sessions: &state.sessions,
2370 session_id: Some(&session_id),
2371 trusted_request_signers: &state.trusted_request_signers,
2372 detached_signature_freshness: state.detached_signature_freshness,
2373 },
2374 );
2375
2376 // TOCTOU-safe evaluation + session update (write lock)
2377 let (mediation_result, ctx) = if let Some(mut session) =
2378 state.sessions.get_mut(&session_id)
2379 {
2380 let ctx = EvaluationContext {
2381 timestamp: None,
2382 agent_id: session.oauth_subject.clone(),
2383 agent_identity: session.agent_identity.clone(),
2384 call_counts: session.call_counts.clone(),
2385 previous_actions: session.action_history.iter().cloned().collect(),
2386 call_chain: session.current_call_chain.clone(),
2387 tenant_id: None,
2388 verification_tier: None,
2389 capability_token: None,
2390 session_state: None,
2391 };
2392 let result = mediate_with_security_context(
2393 &uuid::Uuid::new_v4().to_string().replace('-', ""),
2394 &action,
2395 &state.engine,
2396 Some(&ctx),
2397 security_context.as_ref(),
2398 "websocket",
2399 &state.mediation_config,
2400 Some(&session_id),
2401 None,
2402 );
2403
2404 // Atomically update session on Allow
2405 if matches!(result.verdict, Verdict::Allow) {
2406 session.touch();
2407 use crate::proxy::call_chain::{
2408 MAX_ACTION_HISTORY, MAX_CALL_COUNT_TOOLS,
2409 };
2410 let resource_key = format!(
2411 "resources/read:{}",
2412 uri.chars().take(128).collect::<String>()
2413 );
2414 if session.call_counts.len() < MAX_CALL_COUNT_TOOLS
2415 || session.call_counts.contains_key(&resource_key)
2416 {
2417 let count =
2418 session.call_counts.entry(resource_key).or_insert(0);
2419 *count = count.saturating_add(1);
2420 }
2421 if session.action_history.len() >= MAX_ACTION_HISTORY {
2422 session.action_history.pop_front();
2423 }
2424 session
2425 .action_history
2426 .push_back("resources/read".to_string());
2427 }
2428
2429 (result, ctx)
2430 } else {
2431 let ctx = EvaluationContext::default();
2432 let result = mediate_with_security_context(
2433 &uuid::Uuid::new_v4().to_string().replace('-', ""),
2434 &action,
2435 &state.engine,
2436 None,
2437 security_context.as_ref(),
2438 "websocket",
2439 &state.mediation_config,
2440 Some(&session_id),
2441 None,
2442 );
2443 (result, ctx)
2444 };
2445
2446 let mut final_origin = mediation_result.origin;
2447 let mut acis_envelope = mediation_result.envelope.clone();
2448 let mut refresh_envelope = false;
2449 let verdict = match mediation_result.verdict {
2450 Verdict::RequireApproval { reason } => {
2451 match super::helpers::presented_approval_matches_action(
2452 &state,
2453 &session_id,
2454 presented_approval_id.as_deref(),
2455 &action,
2456 )
2457 .await
2458 {
2459 Ok(Some(approval_id)) => {
2460 matched_approval_id = Some(approval_id);
2461 final_origin = DecisionOrigin::PolicyEngine;
2462 refresh_envelope = true;
2463 Verdict::Allow
2464 }
2465 Ok(None) => Verdict::RequireApproval { reason },
2466 Err(()) => {
2467 final_origin = DecisionOrigin::ApprovalGate;
2468 refresh_envelope = true;
2469 Verdict::Deny {
2470 reason: INVALID_PRESENTED_APPROVAL_REASON.to_string(),
2471 }
2472 }
2473 }
2474 }
2475 other => other,
2476 };
2477 if refresh_envelope {
2478 acis_envelope = refresh_ws_acis_envelope(
2479 &acis_envelope,
2480 &action,
2481 &verdict,
2482 final_origin,
2483 &session_id,
2484 &ctx,
2485 security_context.as_ref(),
2486 );
2487 }
2488
2489 match verdict {
2490 Verdict::Allow => {
2491 // SECURITY (FIND-R116-002): ABAC refinement for resource reads.
2492 // Parity with HTTP handler (handlers.rs:1783) and gRPC (service.rs:972).
2493 if let Some(ref abac) = state.abac_engine {
2494 let principal_id =
2495 ctx.agent_id.as_deref().unwrap_or("anonymous");
2496 let principal_type = ctx.principal_type();
2497 let session_risk = state
2498 .sessions
2499 .get_mut(&session_id)
2500 .and_then(|s| s.risk_score.clone());
2501 let abac_ctx = vellaveto_engine::abac::AbacEvalContext {
2502 eval_ctx: &ctx,
2503 principal_type,
2504 principal_id,
2505 risk_score: session_risk.as_ref(),
2506 };
2507 match abac.evaluate(&action, &abac_ctx) {
2508 vellaveto_engine::abac::AbacDecision::Deny {
2509 policy_id,
2510 reason,
2511 } => {
2512 let deny_verdict = Verdict::Deny {
2513 reason: reason.clone(),
2514 };
2515 let abac_security_context =
2516 super::helpers::abac_deny_security_context(&action);
2517 let envelope =
2518 build_secondary_acis_envelope_with_security_context(
2519 &action,
2520 &deny_verdict,
2521 DecisionOrigin::PolicyEngine,
2522 "websocket",
2523 Some(&session_id),
2524 Some(&abac_security_context),
2525 );
2526 if let Err(e) = state
2527 .audit
2528 .log_entry_with_acis(
2529 &action,
2530 &deny_verdict,
2531 json!({
2532 "source": "ws_proxy",
2533 "session": session_id,
2534 "transport": "websocket",
2535 "event": "abac_deny",
2536 "abac_policy": policy_id,
2537 "uri": uri,
2538 }),
2539 envelope,
2540 )
2541 .await
2542 {
2543 tracing::warn!(
2544 "Failed to audit WS resource ABAC deny: {}",
2545 e
2546 );
2547 }
2548 let error_resp = make_ws_error_response(
2549 Some(id),
2550 -32001,
2551 "Denied by policy",
2552 );
2553 let mut sink = client_sink.lock().await;
2554 let _ =
2555 sink.send(Message::Text(error_resp.into())).await;
2556 continue;
2557 }
2558 vellaveto_engine::abac::AbacDecision::Allow {
2559 policy_id,
2560 } => {
2561 // SECURITY (FIND-R192-002): record_usage parity.
2562 if let Some(ref la) = state.least_agency {
2563 la.record_usage(
2564 principal_id,
2565 &session_id,
2566 &policy_id,
2567 uri,
2568 &action.function,
2569 );
2570 }
2571 }
2572 vellaveto_engine::abac::AbacDecision::NoMatch => {
2573 // Fall through — existing Allow stands
2574 }
2575 #[allow(unreachable_patterns)]
2576 _ => {
2577 tracing::warn!(
2578 "Unknown AbacDecision variant in WS resource_read — fail-closed"
2579 );
2580 let error_resp = make_ws_error_response(
2581 Some(id),
2582 -32001,
2583 "Denied by policy",
2584 );
2585 let mut sink = client_sink.lock().await;
2586 let _ =
2587 sink.send(Message::Text(error_resp.into())).await;
2588 continue;
2589 }
2590 }
2591 }
2592
2593 // NOTE: Session touch + call_counts/action_history
2594 // update already performed inside the TOCTOU-safe
2595 // block above (FIND-R130-002). No separate update here.
2596
2597 if super::helpers::consume_presented_approval(
2598 &state,
2599 &session_id,
2600 matched_approval_id.as_deref(),
2601 &action,
2602 security_context.as_ref(),
2603 )
2604 .await
2605 .is_err()
2606 {
2607 let deny_verdict = Verdict::Deny {
2608 reason: INVALID_PRESENTED_APPROVAL_REASON.to_string(),
2609 };
2610 let invalid_approval_security_context =
2611 super::helpers::invalid_presented_approval_security_context(
2612 &action,
2613 );
2614 let combined_security_context =
2615 super::helpers::merge_transport_security_context(
2616 security_context.as_ref(),
2617 Some(&invalid_approval_security_context),
2618 );
2619 let effective_security_context = combined_security_context
2620 .as_ref()
2621 .unwrap_or(&invalid_approval_security_context);
2622 let envelope =
2623 build_secondary_acis_envelope_with_security_context(
2624 &action,
2625 &deny_verdict,
2626 DecisionOrigin::ApprovalGate,
2627 "websocket",
2628 Some(&session_id),
2629 Some(effective_security_context),
2630 );
2631 let _ = state
2632 .audit
2633 .log_entry_with_acis(
2634 &action,
2635 &deny_verdict,
2636 json!({
2637 "source": "ws_proxy",
2638 "session": session_id,
2639 "transport": "websocket",
2640 "event": "presented_approval_replay_denied",
2641 "approval_id": matched_approval_id,
2642 "uri": uri,
2643 }),
2644 envelope,
2645 )
2646 .await;
2647 let error_resp = make_ws_error_response(
2648 Some(id),
2649 -32001,
2650 "Denied by policy",
2651 );
2652 let mut sink = client_sink.lock().await;
2653 let _ = sink.send(Message::Text(error_resp.into())).await;
2654 continue;
2655 }
2656
2657 if let Err(e) = state
2658 .audit
2659 .log_entry_with_acis(
2660 &action,
2661 &Verdict::Allow,
2662 json!({
2663 "source": "ws_proxy",
2664 "session": session_id,
2665 "transport": "websocket",
2666 "resource_uri": uri,
2667 }),
2668 acis_envelope,
2669 )
2670 .await
2671 {
2672 tracing::error!(
2673 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
2674 e
2675 );
2676 // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
2677 if state.audit_strict_mode {
2678 let error = make_ws_error_response(
2679 Some(id),
2680 -32000,
2681 "Audit logging failed — request denied (strict audit mode)",
2682 );
2683 let mut sink = client_sink.lock().await;
2684 let _ = sink.send(Message::Text(error.into())).await;
2685 continue;
2686 }
2687 }
2688
2689 // SECURITY (FIND-R46-011): Fail-closed on canonicalization
2690 // failure. Do NOT fall back to original text.
2691 let forward_text = if state.canonicalize {
2692 match serde_json::to_string(&parsed) {
2693 Ok(canonical) => canonical,
2694 Err(e) => {
2695 tracing::error!(
2696 "SECURITY: WS resource canonicalization failed: {}",
2697 e
2698 );
2699 let error_resp = make_ws_error_response(
2700 Some(id),
2701 -32603,
2702 "Internal error",
2703 );
2704 let mut sink = client_sink.lock().await;
2705 let _ =
2706 sink.send(Message::Text(error_resp.into())).await;
2707 continue;
2708 }
2709 }
2710 } else {
2711 text.to_string()
2712 };
2713 let mut sink = upstream_sink.lock().await;
2714 if let Err(e) = sink
2715 .send(tokio_tungstenite::tungstenite::Message::Text(
2716 forward_text.into(),
2717 ))
2718 .await
2719 {
2720 tracing::error!("Failed to forward resource read: {}", e);
2721 break;
2722 }
2723 }
2724 // SECURITY (FIND-R116-009): Separate handling for Deny vs RequireApproval
2725 // with per-verdict audit logging. Parity with gRPC (service.rs:1051-1076).
2726 Verdict::Deny { ref reason } => {
2727 let mut audit_metadata = json!({
2728 "source": "ws_proxy",
2729 "session": session_id,
2730 "transport": "websocket",
2731 "resource_uri": uri,
2732 });
2733 if reason == INVALID_PRESENTED_APPROVAL_REASON
2734 && presented_approval_id.is_some()
2735 {
2736 audit_metadata["event"] =
2737 json!("presented_approval_replay_denied");
2738 audit_metadata["approval_id"] = json!(presented_approval_id);
2739 }
2740 if let Err(e) = state
2741 .audit
2742 .log_entry_with_acis(
2743 &action,
2744 &Verdict::Deny {
2745 reason: reason.clone(),
2746 },
2747 audit_metadata,
2748 acis_envelope,
2749 )
2750 .await
2751 {
2752 tracing::error!(
2753 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
2754 e
2755 );
2756 // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
2757 if state.audit_strict_mode {
2758 let error = make_ws_error_response(
2759 Some(id),
2760 -32000,
2761 "Audit logging failed — request denied (strict audit mode)",
2762 );
2763 let mut sink = client_sink.lock().await;
2764 let _ = sink.send(Message::Text(error.into())).await;
2765 continue;
2766 }
2767 }
2768 let error =
2769 make_ws_error_response(Some(id), -32001, "Denied by policy");
2770 let mut sink = client_sink.lock().await;
2771 let _ = sink.send(Message::Text(error.into())).await;
2772 }
2773 Verdict::RequireApproval { ref reason, .. } => {
2774 let approval_verdict = Verdict::RequireApproval {
2775 reason: reason.clone(),
2776 };
2777 let containment_context =
2778 super::helpers::approval_containment_context_from_envelope(
2779 &acis_envelope,
2780 reason,
2781 );
2782 if let Err(e) = state
2783 .audit
2784 .log_entry_with_acis(
2785 &action,
2786 &approval_verdict,
2787 json!({
2788 "source": "ws_proxy",
2789 "session": session_id,
2790 "transport": "websocket",
2791 "resource_uri": uri,
2792 "event": "require_approval",
2793 }),
2794 acis_envelope,
2795 )
2796 .await
2797 {
2798 tracing::error!(
2799 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
2800 e
2801 );
2802 // SECURITY (FIND-CREATIVE-003): Strict audit mode — fail-closed.
2803 if state.audit_strict_mode {
2804 let error = make_ws_error_response(
2805 Some(id),
2806 -32000,
2807 "Audit logging failed — request denied (strict audit mode)",
2808 );
2809 let mut sink = client_sink.lock().await;
2810 let _ = sink.send(Message::Text(error.into())).await;
2811 continue;
2812 }
2813 }
2814 let approval_id =
2815 super::helpers::create_pending_approval_with_context(
2816 &state,
2817 &session_id,
2818 &action,
2819 reason,
2820 containment_context,
2821 )
2822 .await;
2823 let error = make_ws_error_response_with_data(
2824 Some(id),
2825 -32001,
2826 "Approval required",
2827 Some(json!({
2828 "verdict": "require_approval",
2829 "reason": reason,
2830 "approval_id": approval_id,
2831 })),
2832 );
2833 let mut sink = client_sink.lock().await;
2834 let _ = sink.send(Message::Text(error.into())).await;
2835 }
2836 #[allow(unreachable_patterns)]
2837 _ => {
2838 // SECURITY: Future variants — fail-closed.
2839 tracing::warn!(
2840 "Unknown Verdict variant in WS resource_read — fail-closed"
2841 );
2842 let error =
2843 make_ws_error_response(Some(id), -32001, "Denied by policy");
2844 let mut sink = client_sink.lock().await;
2845 let _ = sink.send(Message::Text(error.into())).await;
2846 }
2847 }
2848 }
2849 MessageType::Batch => {
2850 // Reject batches per MCP spec
2851 let action = Action::new(
2852 "vellaveto",
2853 "batch_rejected",
2854 json!({
2855 "session": session_id,
2856 "transport": "websocket",
2857 }),
2858 );
2859 let batch_verdict = Verdict::Deny {
2860 reason: "JSON-RPC batching not supported".to_string(),
2861 };
2862 let batch_security_context =
2863 super::helpers::batch_rejection_security_context(&action);
2864 let envelope = build_secondary_acis_envelope_with_security_context(
2865 &action,
2866 &batch_verdict,
2867 DecisionOrigin::PolicyEngine,
2868 "websocket",
2869 Some(&session_id),
2870 Some(&batch_security_context),
2871 );
2872 if let Err(e) = state
2873 .audit
2874 .log_entry_with_acis(
2875 &action,
2876 &batch_verdict,
2877 json!({
2878 "source": "ws_proxy",
2879 "event": "batch_rejected",
2880 }),
2881 envelope,
2882 )
2883 .await
2884 {
2885 tracing::warn!("Failed to audit WS batch rejection: {}", e);
2886 }
2887 let error = json!({
2888 "jsonrpc": "2.0",
2889 "error": {
2890 "code": -32600,
2891 "message": "JSON-RPC batch requests are not supported"
2892 },
2893 "id": null
2894 });
2895 let error_text = serde_json::to_string(&error)
2896 .unwrap_or_else(|_| r#"{"jsonrpc":"2.0","error":{"code":-32600,"message":"Batch not supported"},"id":null}"#.to_string());
2897 let mut sink = client_sink.lock().await;
2898 let _ = sink.send(Message::Text(error_text.into())).await;
2899 }
2900 MessageType::Invalid { ref id, ref reason } => {
2901 tracing::warn!(
2902 "Invalid JSON-RPC request in WebSocket transport: {}",
2903 reason
2904 );
2905 let error =
2906 make_ws_error_response(Some(id), -32600, "Invalid JSON-RPC request");
2907 let mut sink = client_sink.lock().await;
2908 let _ = sink.send(Message::Text(error.into())).await;
2909 }
2910 MessageType::SamplingRequest { ref id } => {
2911 // SECURITY (FIND-R74-006): Call inspect_sampling() for full
2912 // verdict (enabled + model filter + tool output check + rate limit),
2913 // matching HTTP handler parity (handlers.rs:1681).
2914 let params = parsed.get("params").cloned().unwrap_or(json!({}));
2915 // SECURITY (FIND-R125-001): Per-session sampling rate limit
2916 // parity with elicitation. Atomically read + increment.
2917 let sampling_verdict = {
2918 let mut session_ref = state.sessions.get_mut(&session_id);
2919 let current_count =
2920 session_ref.as_ref().map(|s| s.sampling_count).unwrap_or(0);
2921 let verdict = vellaveto_mcp::elicitation::inspect_sampling(
2922 ¶ms,
2923 &state.sampling_config,
2924 current_count,
2925 );
2926 if matches!(verdict, vellaveto_mcp::elicitation::SamplingVerdict::Allow)
2927 {
2928 if let Some(ref mut s) = session_ref {
2929 s.sampling_count = s.sampling_count.saturating_add(1);
2930 }
2931 }
2932 verdict
2933 };
2934 match sampling_verdict {
2935 vellaveto_mcp::elicitation::SamplingVerdict::Allow => {
2936 // Forward allowed sampling request
2937 // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
2938 // Falling back to original text would create a TOCTOU gap.
2939 let forward_text = if state.canonicalize {
2940 match serde_json::to_string(&parsed) {
2941 Ok(canonical) => canonical,
2942 Err(e) => {
2943 tracing::error!(
2944 "SECURITY: WS sampling canonicalization failed: {}",
2945 e
2946 );
2947 let error_resp = make_ws_error_response(
2948 Some(id),
2949 -32603,
2950 "Internal error",
2951 );
2952 let mut sink = client_sink.lock().await;
2953 let _ =
2954 sink.send(Message::Text(error_resp.into())).await;
2955 continue;
2956 }
2957 }
2958 } else {
2959 text.to_string()
2960 };
2961 let mut sink = upstream_sink.lock().await;
2962 let _ = sink
2963 .send(tokio_tungstenite::tungstenite::Message::Text(
2964 forward_text.into(),
2965 ))
2966 .await;
2967 }
2968 vellaveto_mcp::elicitation::SamplingVerdict::Deny { reason } => {
2969 tracing::warn!(
2970 session_id = %session_id,
2971 "Blocked WS sampling/createMessage: {}",
2972 reason
2973 );
2974 let action = Action::new(
2975 "vellaveto",
2976 "ws_sampling_interception",
2977 json!({
2978 "method": "sampling/createMessage",
2979 "session": session_id,
2980 "transport": "websocket",
2981 "reason": &reason,
2982 }),
2983 );
2984 let verdict = Verdict::Deny {
2985 reason: reason.clone(),
2986 };
2987 let sampling_security_context =
2988 super::helpers::sampling_interception_security_context(&action);
2989 let envelope = build_secondary_acis_envelope_with_security_context(
2990 &action,
2991 &verdict,
2992 DecisionOrigin::PolicyEngine,
2993 "websocket",
2994 Some(&session_id),
2995 Some(&sampling_security_context),
2996 );
2997 if let Err(e) = state
2998 .audit
2999 .log_entry_with_acis(
3000 &action,
3001 &verdict,
3002 json!({
3003 "source": "ws_proxy",
3004 "event": "ws_sampling_interception",
3005 }),
3006 envelope,
3007 )
3008 .await
3009 {
3010 tracing::warn!(
3011 "Failed to audit WS sampling interception: {}",
3012 e
3013 );
3014 }
3015 // SECURITY: Generic message to client — detailed reason
3016 // is in the audit log, not leaked to the client.
3017 let error = make_ws_error_response(
3018 Some(id),
3019 -32001,
3020 "sampling/createMessage blocked by policy",
3021 );
3022 let mut sink = client_sink.lock().await;
3023 let _ = sink.send(Message::Text(error.into())).await;
3024 }
3025 }
3026 }
3027 MessageType::TaskRequest {
3028 ref id,
3029 ref task_method,
3030 ref task_id,
3031 } => {
3032 // SECURITY (FIND-R76-001): Memory poisoning detection on task params.
3033 // Parity with HTTP handler (handlers.rs:2027-2084). Agents could
3034 // exfiltrate poisoned data via task management operations.
3035 {
3036 let task_params = parsed.get("params").cloned().unwrap_or(json!({}));
3037 let poisoning_detected = state
3038 .sessions
3039 .get_mut(&session_id)
3040 .and_then(|session| {
3041 let matches =
3042 session.memory_tracker.check_parameters(&task_params);
3043 if !matches.is_empty() {
3044 for m in &matches {
3045 tracing::warn!(
3046 "SECURITY: Memory poisoning detected in WS task '{}' (session {}): \
3047 param '{}' contains replayed data (fingerprint: {})",
3048 task_method,
3049 session_id,
3050 m.param_location,
3051 m.fingerprint
3052 );
3053 }
3054 Some(matches.len())
3055 } else {
3056 None
3057 }
3058 });
3059 if let Some(match_count) = poisoning_detected {
3060 let poison_action =
3061 extractor::extract_task_action(task_method, task_id.as_deref());
3062 let deny_reason = format!(
3063 "Memory poisoning detected: {match_count} replayed data fragment(s) in task '{task_method}'"
3064 );
3065 let task_poison_verdict = Verdict::Deny {
3066 reason: deny_reason,
3067 };
3068 let poisoning_security_context =
3069 super::helpers::memory_poisoning_security_context(
3070 &task_params,
3071 "memory_poisoning",
3072 );
3073 let envelope = build_secondary_acis_envelope_with_security_context(
3074 &poison_action,
3075 &task_poison_verdict,
3076 DecisionOrigin::MemoryPoisoning,
3077 "websocket",
3078 Some(&session_id),
3079 Some(&poisoning_security_context),
3080 );
3081 if let Err(e) = state
3082 .audit
3083 .log_entry_with_acis(
3084 &poison_action,
3085 &task_poison_verdict,
3086 json!({
3087 "source": "ws_proxy",
3088 "session": session_id,
3089 "transport": "websocket",
3090 "event": "memory_poisoning_detected",
3091 "matches": match_count,
3092 "task_method": task_method,
3093 }),
3094 envelope,
3095 )
3096 .await
3097 {
3098 tracing::warn!(
3099 "Failed to audit WS task memory poisoning: {}",
3100 e
3101 );
3102 }
3103 let error = make_ws_error_response(
3104 Some(id),
3105 -32001,
3106 "Request blocked: security policy violation",
3107 );
3108 let mut sink = client_sink.lock().await;
3109 let _ = sink.send(Message::Text(error.into())).await;
3110 continue;
3111 }
3112 }
3113
3114 // SECURITY (FIND-R76-001): DLP scan task request parameters.
3115 // Parity with HTTP handler (handlers.rs:2086-2145). Agents could
3116 // embed secrets in task_id or params to exfiltrate them.
3117 {
3118 let task_params = parsed.get("params").cloned().unwrap_or(json!({}));
3119 let dlp_findings = scan_parameters_for_secrets(&task_params);
3120 if !dlp_findings.is_empty() {
3121 for finding in &dlp_findings {
3122 record_dlp_finding(&finding.pattern_name);
3123 }
3124 let patterns: Vec<String> = dlp_findings
3125 .iter()
3126 .map(|f| format!("{} at {}", f.pattern_name, f.location))
3127 .collect();
3128 tracing::warn!(
3129 "SECURITY: DLP blocking WS task '{}' in session {}: {:?}",
3130 task_method,
3131 session_id,
3132 patterns
3133 );
3134 let dlp_action =
3135 extractor::extract_task_action(task_method, task_id.as_deref());
3136 let task_dlp_verdict = Verdict::Deny {
3137 reason: format!(
3138 "DLP: secrets detected in task request: {patterns:?}"
3139 ),
3140 };
3141 let parameter_security_context =
3142 super::helpers::parameter_dlp_security_context(
3143 &task_params,
3144 true,
3145 "task_parameter_dlp",
3146 );
3147 let envelope = build_secondary_acis_envelope_with_security_context(
3148 &dlp_action,
3149 &task_dlp_verdict,
3150 DecisionOrigin::Dlp,
3151 "websocket",
3152 Some(&session_id),
3153 Some(¶meter_security_context),
3154 );
3155 if let Err(e) = state
3156 .audit
3157 .log_entry_with_acis(
3158 &dlp_action,
3159 &task_dlp_verdict,
3160 json!({
3161 "source": "ws_proxy",
3162 "session": session_id,
3163 "transport": "websocket",
3164 "event": "dlp_secret_detected_task",
3165 "task_method": task_method,
3166 "findings": patterns,
3167 }),
3168 envelope,
3169 )
3170 .await
3171 {
3172 tracing::warn!("Failed to audit WS task DLP: {}", e);
3173 }
3174 let error = make_ws_error_response(
3175 Some(id),
3176 -32001,
3177 "Request blocked: security policy violation",
3178 );
3179 let mut sink = client_sink.lock().await;
3180 let _ = sink.send(Message::Text(error.into())).await;
3181 continue;
3182 }
3183 }
3184
3185 // Policy-evaluate task requests (async operations)
3186 let action =
3187 extractor::extract_task_action(task_method, task_id.as_deref());
3188 let mut matched_approval_id: Option<String> = None;
3189 // SECURITY (FIND-R130-002): TOCTOU-safe context+eval for task
3190 // requests. Context is built inside the DashMap shard lock to
3191 // prevent stale snapshot evaluation races.
3192 // SECURITY (FIND-R190-006): Update session state on Allow
3193 // (touch + call_counts + action_history) while still holding
3194 // the shard lock, matching ToolCall/ResourceRead parity.
3195
3196 // Pre-compute security context BEFORE acquiring DashMap write lock
3197 // to avoid deadlock — build_ws_runtime_security_context internally
3198 // calls sessions.get()/get_mut() which would re-enter the lock.
3199 let pre_eval_ctx = state
3200 .sessions
3201 .get(&session_id)
3202 .map(|session| EvaluationContext {
3203 timestamp: None,
3204 agent_id: session.oauth_subject.clone(),
3205 agent_identity: session.agent_identity.clone(),
3206 call_chain: session.current_call_chain.clone(),
3207 ..EvaluationContext::default()
3208 })
3209 .unwrap_or_else(|| EvaluationContext {
3210 agent_id: oauth_claims.as_ref().map(|claims| claims.sub.clone()),
3211 ..EvaluationContext::default()
3212 });
3213 let security_context = build_ws_runtime_security_context(
3214 &parsed,
3215 &action,
3216 &handshake_headers,
3217 super::helpers::TransportSecurityInputs {
3218 oauth_evidence: oauth_claims.as_ref(),
3219 eval_ctx: Some(&pre_eval_ctx),
3220 sessions: &state.sessions,
3221 session_id: Some(&session_id),
3222 trusted_request_signers: &state.trusted_request_signers,
3223 detached_signature_freshness: state.detached_signature_freshness,
3224 },
3225 );
3226
3227 // TOCTOU-safe evaluation + session update (write lock)
3228 let (mediation_result, task_eval_ctx) = if let Some(mut session) =
3229 state.sessions.get_mut(&session_id)
3230 {
3231 let ctx = EvaluationContext {
3232 timestamp: None,
3233 agent_id: session.oauth_subject.clone(),
3234 agent_identity: session.agent_identity.clone(),
3235 call_counts: session.call_counts.clone(),
3236 previous_actions: session.action_history.iter().cloned().collect(),
3237 call_chain: session.current_call_chain.clone(),
3238 tenant_id: None,
3239 verification_tier: None,
3240 capability_token: None,
3241 session_state: None,
3242 };
3243 let result = mediate_with_security_context(
3244 &uuid::Uuid::new_v4().to_string().replace('-', ""),
3245 &action,
3246 &state.engine,
3247 Some(&ctx),
3248 security_context.as_ref(),
3249 "websocket",
3250 &state.mediation_config,
3251 Some(&session_id),
3252 None,
3253 );
3254
3255 // Update session atomically on Allow
3256 if matches!(result.verdict, Verdict::Allow) {
3257 session.touch();
3258 use crate::proxy::call_chain::{
3259 MAX_ACTION_HISTORY, MAX_CALL_COUNT_TOOLS,
3260 };
3261 if session.call_counts.len() < MAX_CALL_COUNT_TOOLS
3262 || session.call_counts.contains_key(task_method)
3263 {
3264 let count = session
3265 .call_counts
3266 .entry(task_method.to_string())
3267 .or_insert(0);
3268 *count = count.saturating_add(1);
3269 }
3270 if session.action_history.len() >= MAX_ACTION_HISTORY {
3271 session.action_history.pop_front();
3272 }
3273 session.action_history.push_back(task_method.to_string());
3274 }
3275
3276 (result, ctx)
3277 } else {
3278 let ctx = EvaluationContext::default();
3279 let result = mediate_with_security_context(
3280 &uuid::Uuid::new_v4().to_string().replace('-', ""),
3281 &action,
3282 &state.engine,
3283 None,
3284 security_context.as_ref(),
3285 "websocket",
3286 &state.mediation_config,
3287 Some(&session_id),
3288 None,
3289 );
3290 (result, ctx)
3291 };
3292
3293 let mut final_origin = mediation_result.origin;
3294 let mut acis_envelope = mediation_result.envelope.clone();
3295 let mut refresh_envelope = false;
3296 let verdict = match mediation_result.verdict {
3297 Verdict::RequireApproval { reason } => {
3298 match super::helpers::presented_approval_matches_action(
3299 &state,
3300 &session_id,
3301 presented_approval_id.as_deref(),
3302 &action,
3303 )
3304 .await
3305 {
3306 Ok(Some(approval_id)) => {
3307 matched_approval_id = Some(approval_id);
3308 final_origin = DecisionOrigin::PolicyEngine;
3309 refresh_envelope = true;
3310 Verdict::Allow
3311 }
3312 Ok(None) => Verdict::RequireApproval { reason },
3313 Err(()) => {
3314 final_origin = DecisionOrigin::ApprovalGate;
3315 refresh_envelope = true;
3316 Verdict::Deny {
3317 reason: INVALID_PRESENTED_APPROVAL_REASON.to_string(),
3318 }
3319 }
3320 }
3321 }
3322 other => other,
3323 };
3324 if refresh_envelope {
3325 acis_envelope = refresh_ws_acis_envelope(
3326 &acis_envelope,
3327 &action,
3328 &verdict,
3329 final_origin,
3330 &session_id,
3331 &task_eval_ctx,
3332 security_context.as_ref(),
3333 );
3334 }
3335
3336 match verdict {
3337 Verdict::Allow => {
3338 // SECURITY (FIND-R190-001): ABAC refinement for TaskRequest,
3339 // matching ToolCall/ResourceRead parity.
3340 if let Some(ref abac) = state.abac_engine {
3341 let principal_id =
3342 task_eval_ctx.agent_id.as_deref().unwrap_or("anonymous");
3343 let principal_type = task_eval_ctx.principal_type();
3344 let session_risk = state
3345 .sessions
3346 .get_mut(&session_id)
3347 .and_then(|s| s.risk_score.clone());
3348 let abac_ctx = vellaveto_engine::abac::AbacEvalContext {
3349 eval_ctx: &task_eval_ctx,
3350 principal_type,
3351 principal_id,
3352 risk_score: session_risk.as_ref(),
3353 };
3354 match abac.evaluate(&action, &abac_ctx) {
3355 vellaveto_engine::abac::AbacDecision::Deny {
3356 policy_id,
3357 reason,
3358 } => {
3359 let deny_verdict = Verdict::Deny {
3360 reason: reason.clone(),
3361 };
3362 let abac_security_context =
3363 super::helpers::abac_deny_security_context(&action);
3364 let envelope =
3365 build_secondary_acis_envelope_with_security_context(
3366 &action,
3367 &deny_verdict,
3368 DecisionOrigin::PolicyEngine,
3369 "websocket",
3370 Some(&session_id),
3371 Some(&abac_security_context),
3372 );
3373 if let Err(e) = state
3374 .audit
3375 .log_entry_with_acis(
3376 &action,
3377 &deny_verdict,
3378 json!({
3379 "source": "ws_proxy",
3380 "session": session_id,
3381 "transport": "websocket",
3382 "event": "abac_deny",
3383 "abac_policy": policy_id,
3384 "task_method": task_method,
3385 }),
3386 envelope,
3387 )
3388 .await
3389 {
3390 tracing::warn!(
3391 "Failed to audit WS task ABAC deny: {}",
3392 e
3393 );
3394 }
3395 let error_resp = make_ws_error_response(
3396 Some(id),
3397 -32001,
3398 "Denied by policy",
3399 );
3400 let mut sink = client_sink.lock().await;
3401 let _ =
3402 sink.send(Message::Text(error_resp.into())).await;
3403 continue;
3404 }
3405 vellaveto_engine::abac::AbacDecision::Allow {
3406 policy_id,
3407 } => {
3408 if let Some(ref la) = state.least_agency {
3409 la.record_usage(
3410 principal_id,
3411 &session_id,
3412 &policy_id,
3413 task_method,
3414 &action.function,
3415 );
3416 }
3417 }
3418 vellaveto_engine::abac::AbacDecision::NoMatch => {
3419 // Fall through — existing Allow stands
3420 }
3421 #[allow(unreachable_patterns)]
3422 _ => {
3423 tracing::warn!(
3424 "Unknown AbacDecision variant — fail-closed"
3425 );
3426 let error_resp = make_ws_error_response(
3427 Some(id),
3428 -32001,
3429 "Denied by policy",
3430 );
3431 let mut sink = client_sink.lock().await;
3432 let _ =
3433 sink.send(Message::Text(error_resp.into())).await;
3434 continue;
3435 }
3436 }
3437 }
3438
3439 if super::helpers::consume_presented_approval(
3440 &state,
3441 &session_id,
3442 matched_approval_id.as_deref(),
3443 &action,
3444 security_context.as_ref(),
3445 )
3446 .await
3447 .is_err()
3448 {
3449 let deny_verdict = Verdict::Deny {
3450 reason: INVALID_PRESENTED_APPROVAL_REASON.to_string(),
3451 };
3452 let invalid_approval_security_context =
3453 super::helpers::invalid_presented_approval_security_context(
3454 &action,
3455 );
3456 let combined_security_context =
3457 super::helpers::merge_transport_security_context(
3458 security_context.as_ref(),
3459 Some(&invalid_approval_security_context),
3460 );
3461 let effective_security_context = combined_security_context
3462 .as_ref()
3463 .unwrap_or(&invalid_approval_security_context);
3464 let envelope =
3465 build_secondary_acis_envelope_with_security_context(
3466 &action,
3467 &deny_verdict,
3468 DecisionOrigin::ApprovalGate,
3469 "websocket",
3470 Some(&session_id),
3471 Some(effective_security_context),
3472 );
3473 let _ = state
3474 .audit
3475 .log_entry_with_acis(
3476 &action,
3477 &deny_verdict,
3478 json!({
3479 "source": "ws_proxy",
3480 "session": session_id,
3481 "transport": "websocket",
3482 "event": "presented_approval_replay_denied",
3483 "approval_id": matched_approval_id,
3484 "task_method": task_method,
3485 "task_id": task_id,
3486 }),
3487 envelope,
3488 )
3489 .await;
3490 let error_resp = make_ws_error_response(
3491 Some(id),
3492 -32001,
3493 "Denied by policy",
3494 );
3495 let mut sink = client_sink.lock().await;
3496 let _ = sink.send(Message::Text(error_resp.into())).await;
3497 continue;
3498 }
3499
3500 if let Err(e) = state
3501 .audit
3502 .log_entry_with_acis(
3503 &action,
3504 &Verdict::Allow,
3505 json!({
3506 "source": "ws_proxy",
3507 "session": session_id,
3508 "transport": "websocket",
3509 "task_method": task_method,
3510 }),
3511 acis_envelope,
3512 )
3513 .await
3514 {
3515 tracing::warn!("Failed to audit WS task allow: {}", e);
3516 }
3517 // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
3518 let forward_text = if state.canonicalize {
3519 match serde_json::to_string(&parsed) {
3520 Ok(canonical) => canonical,
3521 Err(e) => {
3522 tracing::error!(
3523 "SECURITY: WS task canonicalization failed: {}",
3524 e
3525 );
3526 let error_resp = make_ws_error_response(
3527 Some(id),
3528 -32603,
3529 "Internal error",
3530 );
3531 let mut sink = client_sink.lock().await;
3532 let _ =
3533 sink.send(Message::Text(error_resp.into())).await;
3534 continue;
3535 }
3536 }
3537 } else {
3538 text.to_string()
3539 };
3540 let mut sink = upstream_sink.lock().await;
3541 if let Err(e) = sink
3542 .send(tokio_tungstenite::tungstenite::Message::Text(
3543 forward_text.into(),
3544 ))
3545 .await
3546 {
3547 tracing::error!("Failed to forward task request: {}", e);
3548 break;
3549 }
3550 }
3551 Verdict::Deny { ref reason } => {
3552 let mut audit_metadata = json!({
3553 "source": "ws_proxy",
3554 "session": session_id,
3555 "transport": "websocket",
3556 "task_method": task_method,
3557 });
3558 if reason == INVALID_PRESENTED_APPROVAL_REASON
3559 && presented_approval_id.is_some()
3560 {
3561 audit_metadata["event"] =
3562 json!("presented_approval_replay_denied");
3563 audit_metadata["approval_id"] = json!(presented_approval_id);
3564 audit_metadata["task_id"] = json!(task_id);
3565 }
3566 if let Err(e) = state
3567 .audit
3568 .log_entry_with_acis(
3569 &action,
3570 &Verdict::Deny {
3571 reason: reason.clone(),
3572 },
3573 audit_metadata,
3574 acis_envelope,
3575 )
3576 .await
3577 {
3578 tracing::error!(
3579 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
3580 e
3581 );
3582 // SECURITY (FIND-R213-002): Strict audit mode — fail-closed.
3583 if state.audit_strict_mode {
3584 let error = make_ws_error_response(
3585 Some(id),
3586 -32000,
3587 "Audit logging failed — request denied (strict audit mode)",
3588 );
3589 let mut sink = client_sink.lock().await;
3590 let _ = sink.send(Message::Text(error.into())).await;
3591 continue;
3592 }
3593 }
3594 // SECURITY (FIND-R55-WS-005): Generic denial message to prevent
3595 // leaking policy names/details. Detailed reason is in audit log.
3596 let denial =
3597 make_ws_error_response(Some(id), -32001, "Denied by policy");
3598 let mut sink = client_sink.lock().await;
3599 let _ = sink.send(Message::Text(denial.into())).await;
3600 }
3601 Verdict::RequireApproval { ref reason, .. } => {
3602 let approval_verdict = Verdict::RequireApproval {
3603 reason: reason.clone(),
3604 };
3605 let containment_context =
3606 super::helpers::approval_containment_context_from_envelope(
3607 &acis_envelope,
3608 reason,
3609 );
3610 if let Err(e) = state
3611 .audit
3612 .log_entry_with_acis(
3613 &action,
3614 &approval_verdict,
3615 json!({
3616 "source": "ws_proxy",
3617 "session": session_id,
3618 "transport": "websocket",
3619 "task_method": task_method,
3620 }),
3621 acis_envelope,
3622 )
3623 .await
3624 {
3625 tracing::error!(
3626 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
3627 e
3628 );
3629 // SECURITY (FIND-R213-002): Strict audit mode — fail-closed.
3630 if state.audit_strict_mode {
3631 let error = make_ws_error_response(
3632 Some(id),
3633 -32000,
3634 "Audit logging failed — request denied (strict audit mode)",
3635 );
3636 let mut sink = client_sink.lock().await;
3637 let _ = sink.send(Message::Text(error.into())).await;
3638 continue;
3639 }
3640 }
3641 let approval_id =
3642 super::helpers::create_pending_approval_with_context(
3643 &state,
3644 &session_id,
3645 &action,
3646 reason,
3647 containment_context,
3648 )
3649 .await;
3650 let denial = make_ws_error_response_with_data(
3651 Some(id),
3652 -32001,
3653 "Approval required",
3654 Some(json!({
3655 "verdict": "require_approval",
3656 "reason": reason,
3657 "approval_id": approval_id,
3658 })),
3659 );
3660 let mut sink = client_sink.lock().await;
3661 let _ = sink.send(Message::Text(denial.into())).await;
3662 }
3663 _ => {
3664 let denial =
3665 make_ws_error_response(Some(id), -32001, "Denied by policy");
3666 let mut sink = client_sink.lock().await;
3667 let _ = sink.send(Message::Text(denial.into())).await;
3668 }
3669 }
3670 }
3671 MessageType::ExtensionMethod {
3672 ref id,
3673 ref extension_id,
3674 ref method,
3675 } => {
3676 // Policy-evaluate extension method calls
3677 let params = parsed.get("params").cloned().unwrap_or(json!({}));
3678
3679 // SECURITY (FIND-R116-001): DLP scan extension method parameters.
3680 // Parity with gRPC handle_extension_method (service.rs:1542).
3681 let dlp_findings = scan_parameters_for_secrets(¶ms);
3682 if !dlp_findings.is_empty() {
3683 for finding in &dlp_findings {
3684 record_dlp_finding(&finding.pattern_name);
3685 }
3686 let patterns: Vec<String> = dlp_findings
3687 .iter()
3688 .map(|f| format!("{}:{}", f.pattern_name, f.location))
3689 .collect();
3690 tracing::warn!(
3691 "SECURITY: Secrets in WS extension method parameters! Session: {}, Extension: {}:{}, Findings: {:?}",
3692 session_id, extension_id, method, patterns,
3693 );
3694 let action =
3695 extractor::extract_extension_action(extension_id, method, ¶ms);
3696 let audit_verdict = Verdict::Deny {
3697 reason: format!(
3698 "DLP blocked: secret detected in extension parameters: {patterns:?}"
3699 ),
3700 };
3701 let parameter_security_context =
3702 super::helpers::parameter_dlp_security_context(
3703 ¶ms,
3704 true,
3705 "extension_parameter_dlp",
3706 );
3707 let envelope = build_secondary_acis_envelope_with_security_context(
3708 &action,
3709 &audit_verdict,
3710 DecisionOrigin::Dlp,
3711 "websocket",
3712 Some(&session_id),
3713 Some(¶meter_security_context),
3714 );
3715 if let Err(e) = state.audit.log_entry_with_acis(
3716 &action, &audit_verdict,
3717 json!({
3718 "source": "ws_proxy", "session": session_id, "transport": "websocket",
3719 "event": "ws_extension_parameter_dlp_alert",
3720 "extension_id": extension_id, "method": method, "findings": patterns,
3721 }),
3722 envelope,
3723 ).await {
3724 tracing::warn!("Failed to audit WS extension parameter DLP: {}", e);
3725 }
3726 let denial =
3727 make_ws_error_response(Some(id), -32001, "Denied by policy");
3728 let mut sink = client_sink.lock().await;
3729 let _ = sink.send(Message::Text(denial.into())).await;
3730 continue;
3731 }
3732
3733 // SECURITY (FIND-R116-001): Memory poisoning detection for extension params.
3734 // Parity with gRPC handle_extension_method (service.rs:1574).
3735 if let Some(session) = state.sessions.get_mut(&session_id) {
3736 let poisoning_matches =
3737 session.memory_tracker.check_parameters(¶ms);
3738 if !poisoning_matches.is_empty() {
3739 for m in &poisoning_matches {
3740 tracing::warn!(
3741 "SECURITY: Memory poisoning in WS extension '{}:{}' (session {}): \
3742 param '{}' replayed data (fingerprint: {})",
3743 extension_id, method, session_id, m.param_location, m.fingerprint
3744 );
3745 }
3746 let action = extractor::extract_extension_action(
3747 extension_id,
3748 method,
3749 ¶ms,
3750 );
3751 let deny_reason = format!(
3752 "Memory poisoning detected: {} replayed data fragment(s) in extension '{}:{}'",
3753 poisoning_matches.len(), extension_id, method
3754 );
3755 let ext_poison_verdict = Verdict::Deny {
3756 reason: deny_reason.clone(),
3757 };
3758 let poisoning_security_context =
3759 super::helpers::memory_poisoning_security_context(
3760 ¶ms,
3761 "memory_poisoning",
3762 );
3763 let envelope = build_secondary_acis_envelope_with_security_context(
3764 &action,
3765 &ext_poison_verdict,
3766 DecisionOrigin::MemoryPoisoning,
3767 "websocket",
3768 Some(&session_id),
3769 Some(&poisoning_security_context),
3770 );
3771 if let Err(e) = state.audit.log_entry_with_acis(
3772 &action,
3773 &ext_poison_verdict,
3774 json!({
3775 "source": "ws_proxy", "session": session_id, "transport": "websocket",
3776 "event": "memory_poisoning_detected",
3777 "matches": poisoning_matches.len(),
3778 "extension_id": extension_id, "method": method,
3779 }),
3780 envelope,
3781 ).await {
3782 tracing::warn!("Failed to audit WS extension memory poisoning: {}", e);
3783 }
3784 let denial =
3785 make_ws_error_response(Some(id), -32001, "Denied by policy");
3786 let mut sink = client_sink.lock().await;
3787 let _ = sink.send(Message::Text(denial.into())).await;
3788 continue;
3789 }
3790 }
3791
3792 let mut action =
3793 extractor::extract_extension_action(extension_id, method, ¶ms);
3794 let mut matched_approval_id: Option<String> = None;
3795
3796 // SECURITY (FIND-R118-004): DNS resolution for extension methods.
3797 // Parity with ToolCall (line 710) and ResourceRead (line 1439).
3798 if state.engine.has_ip_rules() {
3799 super::helpers::resolve_domains(&mut action).await;
3800 }
3801
3802 let ext_key = format!("extension:{extension_id}:{method}");
3803
3804 // SECURITY (FIND-R130-002): TOCTOU-safe context+eval+update
3805 // for extension methods. Matches ToolCall/ResourceRead fixes.
3806
3807 // Pre-compute security context BEFORE acquiring DashMap write lock
3808 // to avoid deadlock — build_ws_runtime_security_context internally
3809 // calls sessions.get()/get_mut() which would re-enter the lock.
3810 let pre_eval_ctx = state
3811 .sessions
3812 .get(&session_id)
3813 .map(|session| EvaluationContext {
3814 timestamp: None,
3815 agent_id: session.oauth_subject.clone(),
3816 agent_identity: session.agent_identity.clone(),
3817 call_chain: session.current_call_chain.clone(),
3818 ..EvaluationContext::default()
3819 })
3820 .unwrap_or_else(|| EvaluationContext {
3821 agent_id: oauth_claims.as_ref().map(|claims| claims.sub.clone()),
3822 ..EvaluationContext::default()
3823 });
3824 let security_context = build_ws_runtime_security_context(
3825 &parsed,
3826 &action,
3827 &handshake_headers,
3828 super::helpers::TransportSecurityInputs {
3829 oauth_evidence: oauth_claims.as_ref(),
3830 eval_ctx: Some(&pre_eval_ctx),
3831 sessions: &state.sessions,
3832 session_id: Some(&session_id),
3833 trusted_request_signers: &state.trusted_request_signers,
3834 detached_signature_freshness: state.detached_signature_freshness,
3835 },
3836 );
3837
3838 // TOCTOU-safe evaluation + session update (write lock)
3839 let (mediation_result, ctx) = if let Some(mut session) =
3840 state.sessions.get_mut(&session_id)
3841 {
3842 let ctx = EvaluationContext {
3843 timestamp: None,
3844 agent_id: session.oauth_subject.clone(),
3845 agent_identity: session.agent_identity.clone(),
3846 call_counts: session.call_counts.clone(),
3847 previous_actions: session.action_history.iter().cloned().collect(),
3848 call_chain: session.current_call_chain.clone(),
3849 tenant_id: None,
3850 verification_tier: None,
3851 capability_token: None,
3852 session_state: None,
3853 };
3854 let result = mediate_with_security_context(
3855 &uuid::Uuid::new_v4().to_string().replace('-', ""),
3856 &action,
3857 &state.engine,
3858 Some(&ctx),
3859 security_context.as_ref(),
3860 "websocket",
3861 &state.mediation_config,
3862 Some(&session_id),
3863 None,
3864 );
3865
3866 // Atomically update session on Allow
3867 if matches!(result.verdict, Verdict::Allow) {
3868 session.touch();
3869 use crate::proxy::call_chain::{
3870 MAX_ACTION_HISTORY, MAX_CALL_COUNT_TOOLS,
3871 };
3872 if session.call_counts.len() < MAX_CALL_COUNT_TOOLS
3873 || session.call_counts.contains_key(&ext_key)
3874 {
3875 let count =
3876 session.call_counts.entry(ext_key.clone()).or_insert(0);
3877 *count = count.saturating_add(1);
3878 }
3879 if session.action_history.len() >= MAX_ACTION_HISTORY {
3880 session.action_history.pop_front();
3881 }
3882 session.action_history.push_back(ext_key.clone());
3883 }
3884
3885 (result, ctx)
3886 } else {
3887 let ctx = EvaluationContext::default();
3888 let result = mediate_with_security_context(
3889 &uuid::Uuid::new_v4().to_string().replace('-', ""),
3890 &action,
3891 &state.engine,
3892 None,
3893 security_context.as_ref(),
3894 "websocket",
3895 &state.mediation_config,
3896 Some(&session_id),
3897 None,
3898 );
3899 (result, ctx)
3900 };
3901
3902 let mut final_origin = mediation_result.origin;
3903 let mut acis_envelope = mediation_result.envelope.clone();
3904 let mut refresh_envelope = false;
3905 let verdict = match mediation_result.verdict {
3906 Verdict::RequireApproval { reason } => {
3907 match super::helpers::presented_approval_matches_action(
3908 &state,
3909 &session_id,
3910 presented_approval_id.as_deref(),
3911 &action,
3912 )
3913 .await
3914 {
3915 Ok(Some(approval_id)) => {
3916 matched_approval_id = Some(approval_id);
3917 final_origin = DecisionOrigin::PolicyEngine;
3918 refresh_envelope = true;
3919 Verdict::Allow
3920 }
3921 Ok(None) => Verdict::RequireApproval { reason },
3922 Err(()) => {
3923 final_origin = DecisionOrigin::ApprovalGate;
3924 refresh_envelope = true;
3925 Verdict::Deny {
3926 reason: INVALID_PRESENTED_APPROVAL_REASON.to_string(),
3927 }
3928 }
3929 }
3930 }
3931 other => other,
3932 };
3933 if refresh_envelope {
3934 acis_envelope = refresh_ws_acis_envelope(
3935 &acis_envelope,
3936 &action,
3937 &verdict,
3938 final_origin,
3939 &session_id,
3940 &ctx,
3941 security_context.as_ref(),
3942 );
3943 }
3944
3945 match verdict {
3946 Verdict::Allow => {
3947 // SECURITY (FIND-R118-002): ABAC refinement for extension methods.
3948 // Parity with ToolCall (line 1099) and ResourceRead (line 1498).
3949 if let Some(ref abac) = state.abac_engine {
3950 let principal_id =
3951 ctx.agent_id.as_deref().unwrap_or("anonymous");
3952 let principal_type = ctx.principal_type();
3953 let session_risk = state
3954 .sessions
3955 .get_mut(&session_id)
3956 .and_then(|s| s.risk_score.clone());
3957 let abac_ctx = vellaveto_engine::abac::AbacEvalContext {
3958 eval_ctx: &ctx,
3959 principal_type,
3960 principal_id,
3961 risk_score: session_risk.as_ref(),
3962 };
3963 match abac.evaluate(&action, &abac_ctx) {
3964 vellaveto_engine::abac::AbacDecision::Deny {
3965 policy_id,
3966 reason,
3967 } => {
3968 let deny_verdict = Verdict::Deny {
3969 reason: reason.clone(),
3970 };
3971 let abac_security_context =
3972 super::helpers::abac_deny_security_context(&action);
3973 let envelope =
3974 build_secondary_acis_envelope_with_security_context(
3975 &action,
3976 &deny_verdict,
3977 DecisionOrigin::PolicyEngine,
3978 "websocket",
3979 Some(&session_id),
3980 Some(&abac_security_context),
3981 );
3982 if let Err(e) = state
3983 .audit
3984 .log_entry_with_acis(
3985 &action,
3986 &deny_verdict,
3987 json!({
3988 "source": "ws_proxy",
3989 "session": session_id,
3990 "transport": "websocket",
3991 "event": "abac_deny",
3992 "extension_id": extension_id,
3993 "abac_policy": policy_id,
3994 }),
3995 envelope,
3996 )
3997 .await
3998 {
3999 tracing::warn!(
4000 "Failed to audit WS extension ABAC deny: {}",
4001 e
4002 );
4003 }
4004 let error_resp = make_ws_error_response(
4005 Some(id),
4006 -32001,
4007 "Denied by policy",
4008 );
4009 let mut sink = client_sink.lock().await;
4010 let _ =
4011 sink.send(Message::Text(error_resp.into())).await;
4012 continue;
4013 }
4014 vellaveto_engine::abac::AbacDecision::Allow {
4015 policy_id,
4016 } => {
4017 if let Some(ref la) = state.least_agency {
4018 la.record_usage(
4019 principal_id,
4020 &session_id,
4021 &policy_id,
4022 &ext_key,
4023 method,
4024 );
4025 }
4026 }
4027 vellaveto_engine::abac::AbacDecision::NoMatch => {
4028 // Fall through — existing Allow stands
4029 }
4030 #[allow(unreachable_patterns)]
4031 // AbacDecision is #[non_exhaustive]
4032 _ => {
4033 // SECURITY: Future variants — fail-closed (deny).
4034 tracing::warn!(
4035 "Unknown AbacDecision variant — fail-closed"
4036 );
4037 let error_resp = make_ws_error_response(
4038 Some(id),
4039 -32001,
4040 "Denied by policy",
4041 );
4042 let mut sink = client_sink.lock().await;
4043 let _ =
4044 sink.send(Message::Text(error_resp.into())).await;
4045 continue;
4046 }
4047 }
4048 }
4049
4050 // NOTE: Session touch + call_counts/action_history
4051 // update already performed inside the TOCTOU-safe
4052 // block above (FIND-R130-002). No separate update here.
4053
4054 if super::helpers::consume_presented_approval(
4055 &state,
4056 &session_id,
4057 matched_approval_id.as_deref(),
4058 &action,
4059 security_context.as_ref(),
4060 )
4061 .await
4062 .is_err()
4063 {
4064 let deny_verdict = Verdict::Deny {
4065 reason: INVALID_PRESENTED_APPROVAL_REASON.to_string(),
4066 };
4067 let invalid_approval_security_context =
4068 super::helpers::invalid_presented_approval_security_context(
4069 &action,
4070 );
4071 let combined_security_context =
4072 super::helpers::merge_transport_security_context(
4073 security_context.as_ref(),
4074 Some(&invalid_approval_security_context),
4075 );
4076 let effective_security_context = combined_security_context
4077 .as_ref()
4078 .unwrap_or(&invalid_approval_security_context);
4079 let envelope =
4080 build_secondary_acis_envelope_with_security_context(
4081 &action,
4082 &deny_verdict,
4083 DecisionOrigin::ApprovalGate,
4084 "websocket",
4085 Some(&session_id),
4086 Some(effective_security_context),
4087 );
4088 let _ = state
4089 .audit
4090 .log_entry_with_acis(
4091 &action,
4092 &deny_verdict,
4093 json!({
4094 "source": "ws_proxy",
4095 "session": session_id,
4096 "transport": "websocket",
4097 "event": "presented_approval_replay_denied",
4098 "approval_id": matched_approval_id,
4099 "extension_id": extension_id,
4100 "method": method,
4101 }),
4102 envelope,
4103 )
4104 .await;
4105 let error_resp = make_ws_error_response(
4106 Some(id),
4107 -32001,
4108 "Denied by policy",
4109 );
4110 let mut sink = client_sink.lock().await;
4111 let _ = sink.send(Message::Text(error_resp.into())).await;
4112 continue;
4113 }
4114
4115 if let Err(e) = state
4116 .audit
4117 .log_entry_with_acis(
4118 &action,
4119 &Verdict::Allow,
4120 json!({
4121 "source": "ws_proxy",
4122 "session": session_id,
4123 "transport": "websocket",
4124 "extension_id": extension_id,
4125 }),
4126 acis_envelope,
4127 )
4128 .await
4129 {
4130 tracing::error!(
4131 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
4132 e
4133 );
4134 // SECURITY (FIND-R215-007): Strict audit mode — fail-closed.
4135 // Parity with Deny and RequireApproval paths.
4136 if state.audit_strict_mode {
4137 let error = make_ws_error_response(
4138 Some(id),
4139 -32000,
4140 "Audit logging failed — request denied (strict audit mode)",
4141 );
4142 let mut sink = client_sink.lock().await;
4143 let _ = sink.send(Message::Text(error.into())).await;
4144 continue;
4145 }
4146 }
4147 // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
4148 let forward_text = if state.canonicalize {
4149 match serde_json::to_string(&parsed) {
4150 Ok(canonical) => canonical,
4151 Err(e) => {
4152 tracing::error!("SECURITY: WS extension canonicalization failed: {}", e);
4153 let error_resp = make_ws_error_response(
4154 Some(id),
4155 -32603,
4156 "Internal error",
4157 );
4158 let mut sink = client_sink.lock().await;
4159 let _ =
4160 sink.send(Message::Text(error_resp.into())).await;
4161 continue;
4162 }
4163 }
4164 } else {
4165 text.to_string()
4166 };
4167 let mut sink = upstream_sink.lock().await;
4168 if let Err(e) = sink
4169 .send(tokio_tungstenite::tungstenite::Message::Text(
4170 forward_text.into(),
4171 ))
4172 .await
4173 {
4174 tracing::error!("Failed to forward extension request: {}", e);
4175 break;
4176 }
4177 }
4178 Verdict::Deny { ref reason } => {
4179 let mut audit_metadata = json!({
4180 "source": "ws_proxy",
4181 "session": session_id,
4182 "transport": "websocket",
4183 "extension_id": extension_id,
4184 });
4185 if reason == INVALID_PRESENTED_APPROVAL_REASON
4186 && presented_approval_id.is_some()
4187 {
4188 audit_metadata["event"] =
4189 json!("presented_approval_replay_denied");
4190 audit_metadata["approval_id"] = json!(presented_approval_id);
4191 audit_metadata["method"] = json!(method);
4192 }
4193 if let Err(e) = state
4194 .audit
4195 .log_entry_with_acis(
4196 &action,
4197 &Verdict::Deny {
4198 reason: reason.clone(),
4199 },
4200 audit_metadata,
4201 acis_envelope,
4202 )
4203 .await
4204 {
4205 tracing::error!(
4206 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
4207 e
4208 );
4209 // SECURITY (FIND-R213-002): Strict audit mode — fail-closed.
4210 if state.audit_strict_mode {
4211 let error = make_ws_error_response(
4212 Some(id),
4213 -32000,
4214 "Audit logging failed — request denied (strict audit mode)",
4215 );
4216 let mut sink = client_sink.lock().await;
4217 let _ = sink.send(Message::Text(error.into())).await;
4218 continue;
4219 }
4220 }
4221 // SECURITY (FIND-R213-001): Generic denial message — do not leak
4222 // detailed policy reason to client. Reason is in the audit log.
4223 let _ = reason;
4224 let denial =
4225 make_ws_error_response(Some(id), -32001, "Denied by policy");
4226 let mut sink = client_sink.lock().await;
4227 let _ = sink.send(Message::Text(denial.into())).await;
4228 }
4229 Verdict::RequireApproval { ref reason, .. } => {
4230 let approval_verdict = Verdict::RequireApproval {
4231 reason: reason.clone(),
4232 };
4233 let containment_context =
4234 super::helpers::approval_containment_context_from_envelope(
4235 &acis_envelope,
4236 reason,
4237 );
4238 if let Err(e) = state
4239 .audit
4240 .log_entry_with_acis(
4241 &action,
4242 &approval_verdict,
4243 json!({
4244 "source": "ws_proxy",
4245 "session": session_id,
4246 "transport": "websocket",
4247 "extension_id": extension_id,
4248 }),
4249 acis_envelope,
4250 )
4251 .await
4252 {
4253 tracing::error!(
4254 "AUDIT FAILURE in WS proxy: security decision not recorded: {}",
4255 e
4256 );
4257 // SECURITY (FIND-R213-002): Strict audit mode — fail-closed.
4258 if state.audit_strict_mode {
4259 let error = make_ws_error_response(
4260 Some(id),
4261 -32000,
4262 "Audit logging failed — request denied (strict audit mode)",
4263 );
4264 let mut sink = client_sink.lock().await;
4265 let _ = sink.send(Message::Text(error.into())).await;
4266 continue;
4267 }
4268 }
4269 let approval_id =
4270 super::helpers::create_pending_approval_with_context(
4271 &state,
4272 &session_id,
4273 &action,
4274 reason,
4275 containment_context,
4276 )
4277 .await;
4278 let denial = make_ws_error_response_with_data(
4279 Some(id),
4280 -32001,
4281 "Approval required",
4282 Some(json!({
4283 "verdict": "require_approval",
4284 "reason": reason,
4285 "approval_id": approval_id,
4286 })),
4287 );
4288 let mut sink = client_sink.lock().await;
4289 let _ = sink.send(Message::Text(denial.into())).await;
4290 }
4291 _ => {
4292 let denial =
4293 make_ws_error_response(Some(id), -32001, "Denied by policy");
4294 let mut sink = client_sink.lock().await;
4295 let _ = sink.send(Message::Text(denial.into())).await;
4296 }
4297 }
4298 }
4299 MessageType::ElicitationRequest { ref id } => {
4300 // SECURITY (FIND-R46-010): Policy checks for elicitation requests.
4301 // Match the HTTP POST handler's elicitation inspection logic.
4302 let params = parsed.get("params").cloned().unwrap_or(json!({}));
4303 let elicitation_verdict = {
4304 let mut session_ref = state.sessions.get_mut(&session_id);
4305 let current_count = session_ref
4306 .as_ref()
4307 .map(|s| s.elicitation_count)
4308 .unwrap_or(0);
4309 let verdict = vellaveto_mcp::elicitation::inspect_elicitation(
4310 ¶ms,
4311 &state.elicitation_config,
4312 current_count,
4313 );
4314 // Pre-increment while holding the lock to close the TOCTOU gap
4315 if matches!(
4316 verdict,
4317 vellaveto_mcp::elicitation::ElicitationVerdict::Allow
4318 ) {
4319 if let Some(ref mut s) = session_ref {
4320 // SECURITY (FIND-R51-008): Use saturating_add for consistency.
4321 s.elicitation_count = s.elicitation_count.saturating_add(1);
4322 }
4323 }
4324 verdict
4325 };
4326 match elicitation_verdict {
4327 vellaveto_mcp::elicitation::ElicitationVerdict::Allow => {
4328 let action = Action::new(
4329 "vellaveto",
4330 "ws_forward_message",
4331 json!({
4332 "message_type": "elicitation_request",
4333 "session": session_id,
4334 "transport": "websocket",
4335 "direction": "client_to_upstream",
4336 }),
4337 );
4338 let elicitation_security_context =
4339 super::helpers::protocol_forward_channel_security_context(
4340 "ws_elicitation_forwarded",
4341 vellaveto_types::ContextChannel::ApprovalPrompt,
4342 );
4343 let envelope = build_secondary_acis_envelope_with_security_context(
4344 &action,
4345 &Verdict::Allow,
4346 DecisionOrigin::PolicyEngine,
4347 "websocket",
4348 Some(&session_id),
4349 Some(&elicitation_security_context),
4350 );
4351 if let Err(e) = state
4352 .audit
4353 .log_entry_with_acis(
4354 &action,
4355 &Verdict::Allow,
4356 json!({
4357 "source": "ws_proxy",
4358 "event": "ws_elicitation_forwarded",
4359 }),
4360 envelope,
4361 )
4362 .await
4363 {
4364 tracing::warn!("Failed to audit WS elicitation: {}", e);
4365 }
4366
4367 // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
4368 let forward_text = if state.canonicalize {
4369 match serde_json::to_string(&parsed) {
4370 Ok(canonical) => canonical,
4371 Err(e) => {
4372 tracing::error!("SECURITY: WS elicitation canonicalization failed: {}", e);
4373 let error_resp = make_ws_error_response(
4374 Some(id),
4375 -32603,
4376 "Internal error",
4377 );
4378 let mut sink = client_sink.lock().await;
4379 let _ =
4380 sink.send(Message::Text(error_resp.into())).await;
4381 continue;
4382 }
4383 }
4384 } else {
4385 text.to_string()
4386 };
4387 let mut sink = upstream_sink.lock().await;
4388 if let Err(e) = sink
4389 .send(tokio_tungstenite::tungstenite::Message::Text(
4390 forward_text.into(),
4391 ))
4392 .await
4393 {
4394 // Rollback pre-incremented count on forward failure
4395 if let Some(mut s) = state.sessions.get_mut(&session_id) {
4396 s.elicitation_count = s.elicitation_count.saturating_sub(1);
4397 }
4398 tracing::error!("Failed to forward elicitation: {}", e);
4399 break;
4400 }
4401 }
4402 vellaveto_mcp::elicitation::ElicitationVerdict::Deny { reason } => {
4403 tracing::warn!(
4404 session_id = %session_id,
4405 "Blocked WS elicitation/create: {}",
4406 reason
4407 );
4408 let action = Action::new(
4409 "vellaveto",
4410 "ws_elicitation_interception",
4411 json!({
4412 "method": "elicitation/create",
4413 "session": session_id,
4414 "transport": "websocket",
4415 "reason": &reason,
4416 }),
4417 );
4418 let verdict = Verdict::Deny {
4419 reason: reason.clone(),
4420 };
4421 let elicitation_security_context =
4422 super::helpers::elicitation_interception_security_context(
4423 &action,
4424 );
4425 let envelope = build_secondary_acis_envelope_with_security_context(
4426 &action,
4427 &verdict,
4428 DecisionOrigin::PolicyEngine,
4429 "websocket",
4430 Some(&session_id),
4431 Some(&elicitation_security_context),
4432 );
4433 if let Err(e) = state
4434 .audit
4435 .log_entry_with_acis(
4436 &action,
4437 &verdict,
4438 json!({
4439 "source": "ws_proxy",
4440 "event": "ws_elicitation_interception",
4441 }),
4442 envelope,
4443 )
4444 .await
4445 {
4446 tracing::warn!(
4447 "Failed to audit WS elicitation interception: {}",
4448 e
4449 );
4450 }
4451 // SECURITY (FIND-R46-012, FIND-R55-WS-006): Generic message to client.
4452 let error =
4453 make_ws_error_response(Some(id), -32001, "Denied by policy");
4454 let mut sink = client_sink.lock().await;
4455 let _ = sink.send(Message::Text(error.into())).await;
4456 }
4457 }
4458 }
4459 MessageType::PassThrough | MessageType::ProgressNotification { .. } => {
4460 // SECURITY (FIND-R76-003): DLP scan PassThrough params for secrets.
4461 // Parity with HTTP handler (handlers.rs:1795-1859). Agents could
4462 // exfiltrate secrets via prompts/get, completion/complete, or any
4463 // PassThrough method's parameters.
4464 // SECURITY (FIND-R97-001): Remove method gate — JSON-RPC responses
4465 // (sampling/elicitation replies) have no `method` field but carry
4466 // data in `result`. Parity with stdio proxy FIND-R96-001.
4467 if state.response_dlp_enabled {
4468 let mut dlp_findings = scan_notification_for_secrets(&parsed);
4469 // SECURITY (FIND-R97-001): Also scan `result` field for responses.
4470 if let Some(result_val) = parsed.get("result") {
4471 dlp_findings.extend(scan_parameters_for_secrets(result_val));
4472 }
4473 // SECURITY (FIND-R83-006): Cap combined findings from params+result
4474 // scans to maintain per-scan invariant (1000).
4475 dlp_findings.truncate(1000);
4476 if !dlp_findings.is_empty() {
4477 for finding in &dlp_findings {
4478 record_dlp_finding(&finding.pattern_name);
4479 }
4480 let patterns: Vec<String> = dlp_findings
4481 .iter()
4482 .map(|f| format!("{}:{}", f.pattern_name, f.location))
4483 .collect();
4484 tracing::warn!(
4485 "SECURITY: Secrets in WS passthrough params! Session: {}, Findings: {:?}",
4486 session_id,
4487 patterns
4488 );
4489 let n_action = Action::new(
4490 "vellaveto",
4491 "notification_dlp_scan",
4492 json!({
4493 "findings": patterns,
4494 "session": session_id,
4495 "transport": "websocket",
4496 }),
4497 );
4498 let verdict = if state.response_dlp_blocking {
4499 Verdict::Deny {
4500 reason: format!(
4501 "Notification blocked: secrets detected ({patterns:?})"
4502 ),
4503 }
4504 } else {
4505 Verdict::Allow
4506 };
4507 let notification_security_context =
4508 super::helpers::notification_dlp_security_context(
4509 &parsed,
4510 state.response_dlp_blocking,
4511 );
4512 let envelope = build_secondary_acis_envelope_with_security_context(
4513 &n_action,
4514 &verdict,
4515 DecisionOrigin::Dlp,
4516 "websocket",
4517 Some(&session_id),
4518 Some(¬ification_security_context),
4519 );
4520 if let Err(e) = state
4521 .audit
4522 .log_entry_with_acis(
4523 &n_action,
4524 &verdict,
4525 json!({
4526 "source": "ws_proxy",
4527 "event": "notification_dlp_alert",
4528 "blocked": state.response_dlp_blocking,
4529 }),
4530 envelope,
4531 )
4532 .await
4533 {
4534 tracing::warn!("Failed to audit WS passthrough DLP: {}", e);
4535 }
4536 if state.response_dlp_blocking {
4537 // Drop the message silently (passthrough has no id to respond to)
4538 continue;
4539 }
4540 }
4541 }
4542
4543 // SECURITY (FIND-R130-001): Injection scanning on PassThrough parameters.
4544 // Parity with HTTP handler (handlers.rs FIND-R112-008) and gRPC handler
4545 // (service.rs FIND-R113-001). An agent could inject prompt injection
4546 // payloads via any PassThrough method's parameters.
4547 if !state.injection_disabled {
4548 let mut inj_parts = Vec::new();
4549 if let Some(params) = parsed.get("params") {
4550 extract_strings_recursive(params, &mut inj_parts, 0);
4551 }
4552 if let Some(result) = parsed.get("result") {
4553 extract_strings_recursive(result, &mut inj_parts, 0);
4554 }
4555 let scannable = inj_parts.join("\n");
4556 if !scannable.is_empty() {
4557 let injection_matches: Vec<String> =
4558 if let Some(ref scanner) = state.injection_scanner {
4559 scanner
4560 .inspect(&scannable)
4561 .into_iter()
4562 .map(|s| s.to_string())
4563 .collect()
4564 } else {
4565 inspect_for_injection(&scannable)
4566 .into_iter()
4567 .map(|s| s.to_string())
4568 .collect()
4569 };
4570
4571 if !injection_matches.is_empty() {
4572 tracing::warn!(
4573 "SECURITY: Injection in WS passthrough params! \
4574 Session: {}, Patterns: {:?}",
4575 session_id,
4576 injection_matches,
4577 );
4578
4579 let verdict = if state.injection_blocking {
4580 Verdict::Deny {
4581 reason: format!(
4582 "WS passthrough injection blocked: {injection_matches:?}"
4583 ),
4584 }
4585 } else {
4586 Verdict::Allow
4587 };
4588
4589 let inj_action = Action::new(
4590 "vellaveto",
4591 "ws_passthrough_injection_scan",
4592 json!({
4593 "matched_patterns": injection_matches,
4594 "session": session_id,
4595 "transport": "websocket",
4596 "direction": "client_to_upstream",
4597 }),
4598 );
4599 let injection_security_context =
4600 super::helpers::notification_injection_security_context(
4601 &parsed,
4602 state.injection_blocking,
4603 "passthrough_injection",
4604 );
4605 let envelope =
4606 build_secondary_acis_envelope_with_security_context(
4607 &inj_action,
4608 &verdict,
4609 DecisionOrigin::InjectionScanner,
4610 "websocket",
4611 Some(&session_id),
4612 Some(&injection_security_context),
4613 );
4614 if let Err(e) = state
4615 .audit
4616 .log_entry_with_acis(
4617 &inj_action,
4618 &verdict,
4619 json!({
4620 "source": "ws_proxy",
4621 "event": "ws_passthrough_injection_detected",
4622 "blocking": state.injection_blocking,
4623 }),
4624 envelope,
4625 )
4626 .await
4627 {
4628 tracing::warn!(
4629 "Failed to audit WS passthrough injection: {}",
4630 e
4631 );
4632 }
4633
4634 if state.injection_blocking {
4635 // Drop the message (passthrough has no id to respond to)
4636 continue;
4637 }
4638 }
4639 }
4640 }
4641
4642 // SECURITY (IMP-R182-009): Memory poisoning check — parity with
4643 // tool calls, resource reads, tasks, and extension methods.
4644 if let Some(mut session) = state.sessions.get_mut(&session_id) {
4645 let params_to_scan = parsed.get("params").cloned().unwrap_or(json!({}));
4646 // SECURITY (IMP-R184-010): Also scan `result` field — parity
4647 // with DLP scan which scans both params and result.
4648 let mut poisoning_matches =
4649 session.memory_tracker.check_parameters(¶ms_to_scan);
4650 if let Some(result_val) = parsed.get("result") {
4651 poisoning_matches
4652 .extend(session.memory_tracker.check_parameters(result_val));
4653 }
4654 if !poisoning_matches.is_empty() {
4655 let method_name = parsed
4656 .get("method")
4657 .and_then(|m| m.as_str())
4658 .unwrap_or("unknown");
4659 for m in &poisoning_matches {
4660 tracing::warn!(
4661 "SECURITY: Memory poisoning in WS passthrough '{}' (session {}): \
4662 param '{}' replayed data (fingerprint: {})",
4663 method_name,
4664 session_id,
4665 m.param_location,
4666 m.fingerprint
4667 );
4668 }
4669 let poison_action = Action::new(
4670 "vellaveto",
4671 "ws_passthrough_memory_poisoning",
4672 json!({
4673 "method": method_name,
4674 "session": session_id,
4675 "matches": poisoning_matches.len(),
4676 "transport": "websocket",
4677 }),
4678 );
4679 let passthrough_poison_verdict = Verdict::Deny {
4680 reason: format!(
4681 "WS passthrough blocked: memory poisoning ({} matches)",
4682 poisoning_matches.len()
4683 ),
4684 };
4685 let poisoning_security_context =
4686 super::helpers::notification_memory_poisoning_security_context(
4687 &parsed,
4688 "memory_poisoning",
4689 );
4690 let envelope = build_secondary_acis_envelope_with_security_context(
4691 &poison_action,
4692 &passthrough_poison_verdict,
4693 DecisionOrigin::MemoryPoisoning,
4694 "websocket",
4695 Some(&session_id),
4696 Some(&poisoning_security_context),
4697 );
4698 if let Err(e) = state
4699 .audit
4700 .log_entry_with_acis(
4701 &poison_action,
4702 &passthrough_poison_verdict,
4703 json!({
4704 "source": "ws_proxy",
4705 "event": "ws_passthrough_memory_poisoning",
4706 }),
4707 envelope,
4708 )
4709 .await
4710 {
4711 tracing::warn!(
4712 "Failed to audit WS passthrough memory poisoning: {}",
4713 e
4714 );
4715 }
4716 continue; // Drop the message
4717 }
4718 // Fingerprint for future poisoning detection.
4719 session.memory_tracker.extract_from_value(¶ms_to_scan);
4720 if let Some(result_val) = parsed.get("result") {
4721 session.memory_tracker.extract_from_value(result_val);
4722 }
4723 } else {
4724 // IMP-R186-005: Log when session is missing so the skip is observable.
4725 tracing::warn!(
4726 "Session {} not found for WS passthrough memory poisoning check",
4727 session_id
4728 );
4729 }
4730
4731 // SECURITY (FIND-R46-WS-004): Audit log forwarded passthrough/notification messages
4732 let msg_type = match &classified {
4733 MessageType::ProgressNotification { .. } => "progress_notification",
4734 _ => "passthrough",
4735 };
4736 let action = Action::new(
4737 "vellaveto",
4738 "ws_forward_message",
4739 json!({
4740 "message_type": msg_type,
4741 "session": session_id,
4742 "transport": "websocket",
4743 "direction": "client_to_upstream",
4744 }),
4745 );
4746 let protocol_security_context =
4747 super::helpers::protocol_forward_security_context(
4748 "ws_message_forwarded",
4749 );
4750 let envelope = build_secondary_acis_envelope_with_security_context(
4751 &action,
4752 &Verdict::Allow,
4753 DecisionOrigin::PolicyEngine,
4754 "websocket",
4755 Some(&session_id),
4756 Some(&protocol_security_context),
4757 );
4758 if let Err(e) = state
4759 .audit
4760 .log_entry_with_acis(
4761 &action,
4762 &Verdict::Allow,
4763 json!({
4764 "source": "ws_proxy",
4765 "event": "ws_message_forwarded",
4766 }),
4767 envelope,
4768 )
4769 .await
4770 {
4771 tracing::warn!("Failed to audit WS passthrough: {}", e);
4772 }
4773
4774 // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
4775 let forward_text = if state.canonicalize {
4776 match serde_json::to_string(&parsed) {
4777 Ok(canonical) => canonical,
4778 Err(e) => {
4779 tracing::error!(
4780 "SECURITY: WS passthrough canonicalization failed: {}",
4781 e
4782 );
4783 continue;
4784 }
4785 }
4786 } else {
4787 text.to_string()
4788 };
4789 let mut sink = upstream_sink.lock().await;
4790 if let Err(e) = sink
4791 .send(tokio_tungstenite::tungstenite::Message::Text(
4792 forward_text.into(),
4793 ))
4794 .await
4795 {
4796 tracing::error!("Failed to forward passthrough: {}", e);
4797 break;
4798 }
4799 }
4800 }
4801 }
4802 Message::Binary(_data) => {
4803 // SECURITY: Binary frames not allowed for JSON-RPC
4804 tracing::warn!(
4805 session_id = %session_id,
4806 "Binary WebSocket frame rejected (JSON-RPC is text-only)"
4807 );
4808
4809 // SECURITY (FIND-R46-WS-004): Audit log binary frame rejection
4810 let action = Action::new(
4811 "vellaveto",
4812 "ws_binary_frame_rejected",
4813 json!({
4814 "session": session_id,
4815 "transport": "websocket",
4816 "direction": "client_to_upstream",
4817 }),
4818 );
4819 let binary_verdict = Verdict::Deny {
4820 reason: "Binary frames not supported for JSON-RPC".to_string(),
4821 };
4822 let binary_security_context =
4823 super::helpers::protocol_binary_rejection_security_context(
4824 "ws_binary_frame_rejected",
4825 );
4826 let envelope = build_secondary_acis_envelope_with_security_context(
4827 &action,
4828 &binary_verdict,
4829 DecisionOrigin::PolicyEngine,
4830 "websocket",
4831 Some(&session_id),
4832 Some(&binary_security_context),
4833 );
4834 if let Err(e) = state
4835 .audit
4836 .log_entry_with_acis(
4837 &action,
4838 &binary_verdict,
4839 json!({
4840 "source": "ws_proxy",
4841 "event": "ws_binary_frame_rejected",
4842 }),
4843 envelope,
4844 )
4845 .await
4846 {
4847 tracing::warn!("Failed to audit WS binary frame rejection: {}", e);
4848 }
4849
4850 let mut sink = client_sink.lock().await;
4851 let _ = sink
4852 .send(Message::Close(Some(CloseFrame {
4853 code: CLOSE_UNSUPPORTED_DATA,
4854 reason: "Binary frames not supported".into(),
4855 })))
4856 .await;
4857 break;
4858 }
4859 Message::Ping(data) => {
4860 let mut sink = client_sink.lock().await;
4861 let _ = sink.send(Message::Pong(data)).await;
4862 }
4863 Message::Pong(_) => {
4864 // Ignored
4865 }
4866 Message::Close(_) => {
4867 tracing::debug!(session_id = %session_id, "Client sent close frame");
4868 break;
4869 }
4870 }
4871 }
4872}
4873
4874/// Relay messages from upstream to client with DLP and injection scanning.
4875#[allow(clippy::too_many_arguments)]
4876async fn relay_upstream_to_client(
4877 mut upstream_stream: futures_util::stream::SplitStream<
4878 tokio_tungstenite::WebSocketStream<
4879 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
4880 >,
4881 >,
4882 client_sink: Arc<Mutex<futures_util::stream::SplitSink<WebSocket, Message>>>,
4883 state: ProxyState,
4884 session_id: String,
4885 ws_config: WebSocketConfig,
4886 upstream_rate_counter: Arc<AtomicU64>,
4887 upstream_rate_window_start: Arc<std::sync::Mutex<std::time::Instant>>,
4888 last_activity: Arc<AtomicU64>,
4889 connection_epoch: std::time::Instant,
4890) {
4891 while let Some(msg_result) = upstream_stream.next().await {
4892 let msg = match msg_result {
4893 Ok(m) => m,
4894 Err(e) => {
4895 tracing::debug!(session_id = %session_id, "Upstream WS error: {}", e);
4896 break;
4897 }
4898 };
4899
4900 // SECURITY (FIND-R182-001): Update last-activity for true idle detection.
4901 // SECURITY (R251-WS-1): SeqCst ensures visibility to timeout checker thread.
4902 last_activity.store(
4903 connection_epoch.elapsed().as_millis() as u64,
4904 Ordering::SeqCst,
4905 );
4906
4907 record_ws_message("upstream_to_client");
4908
4909 // SECURITY (FIND-R46-WS-003): Rate limiting on upstream→client direction.
4910 // A malicious or compromised upstream could flood the client with messages.
4911 if !check_rate_limit(
4912 &upstream_rate_counter,
4913 &upstream_rate_window_start,
4914 ws_config.upstream_rate_limit,
4915 ) {
4916 tracing::warn!(
4917 session_id = %session_id,
4918 "WebSocket upstream rate limit exceeded ({}/s), dropping message",
4919 ws_config.upstream_rate_limit,
4920 );
4921
4922 let action = Action::new(
4923 "vellaveto",
4924 "ws_upstream_rate_limit",
4925 json!({
4926 "session": session_id,
4927 "transport": "websocket",
4928 "direction": "upstream_to_client",
4929 "limit": ws_config.upstream_rate_limit,
4930 }),
4931 );
4932 let rate_verdict = Verdict::Deny {
4933 reason: "Upstream rate limit exceeded".to_string(),
4934 };
4935 let rate_limit_security_context =
4936 super::helpers::protocol_rate_limit_security_context("ws_upstream_rate_limit");
4937 let envelope = build_secondary_acis_envelope_with_security_context(
4938 &action,
4939 &rate_verdict,
4940 DecisionOrigin::RateLimiter,
4941 "websocket",
4942 Some(&session_id),
4943 Some(&rate_limit_security_context),
4944 );
4945 if let Err(e) = state
4946 .audit
4947 .log_entry_with_acis(
4948 &action,
4949 &rate_verdict,
4950 json!({
4951 "source": "ws_proxy",
4952 "event": "ws_upstream_rate_limit_exceeded",
4953 }),
4954 envelope,
4955 )
4956 .await
4957 {
4958 tracing::warn!("Failed to audit WS upstream rate limit: {}", e);
4959 }
4960
4961 metrics::counter!(
4962 "vellaveto_ws_upstream_rate_limited_total",
4963 "session" => session_id.clone()
4964 )
4965 .increment(1);
4966
4967 // Drop the message (don't close the connection — upstream flood should not
4968 // disconnect the client, just throttle the flow)
4969 continue;
4970 }
4971
4972 match msg {
4973 tokio_tungstenite::tungstenite::Message::Text(text) => {
4974 // Try to parse for scanning
4975 let forward = if let Ok(json_val) = serde_json::from_str::<Value>(&text) {
4976 // Resolve tracked tool context for response-side schema checks.
4977 let tracked_tool_name =
4978 take_tracked_tool_call(&state.sessions, &session_id, json_val.get("id"));
4979
4980 // SECURITY (FIND-R75-003): Track whether DLP or injection was detected
4981 // (even in log-only mode) to gate memory_tracker.record_response().
4982 // Recording fingerprints from tainted responses would poison the tracker.
4983 let mut dlp_found = false;
4984 let mut injection_found = false;
4985
4986 // DLP scanning on responses
4987 if state.response_dlp_enabled {
4988 let dlp_findings = scan_response_for_secrets(&json_val);
4989 if !dlp_findings.is_empty() {
4990 dlp_found = true;
4991 for finding in &dlp_findings {
4992 record_dlp_finding(&finding.pattern_name);
4993 }
4994
4995 let patterns: Vec<String> = dlp_findings
4996 .iter()
4997 .map(|f| format!("{}:{}", f.pattern_name, f.location))
4998 .collect();
4999
5000 tracing::warn!(
5001 "SECURITY: Secrets in WS response! Session: {}, Findings: {:?}",
5002 session_id,
5003 patterns,
5004 );
5005
5006 let verdict = if state.response_dlp_blocking {
5007 Verdict::Deny {
5008 reason: format!("WS response DLP blocked: {patterns:?}"),
5009 }
5010 } else {
5011 Verdict::Allow
5012 };
5013
5014 let action = Action::new(
5015 "vellaveto",
5016 "ws_response_dlp_scan",
5017 json!({
5018 "findings": patterns,
5019 "session": session_id,
5020 "transport": "websocket",
5021 }),
5022 );
5023 let dlp_security_context =
5024 super::helpers::response_dlp_security_context(
5025 tracked_tool_name.as_deref(),
5026 &json_val,
5027 state.response_dlp_blocking,
5028 );
5029 let envelope = build_secondary_acis_envelope_with_security_context(
5030 &action,
5031 &verdict,
5032 DecisionOrigin::Dlp,
5033 "websocket",
5034 Some(&session_id),
5035 Some(&dlp_security_context),
5036 );
5037 if let Err(e) = state
5038 .audit
5039 .log_entry_with_acis(
5040 &action,
5041 &verdict,
5042 json!({
5043 "source": "ws_proxy",
5044 "event": "ws_response_dlp_alert",
5045 }),
5046 envelope,
5047 )
5048 .await
5049 {
5050 tracing::warn!("Failed to audit WS DLP: {}", e);
5051 }
5052
5053 if state.response_dlp_blocking {
5054 // Send error response instead
5055 let id = json_val.get("id");
5056 let error = make_ws_error_response(
5057 id,
5058 -32001,
5059 "Response blocked by DLP policy",
5060 );
5061 let mut sink = client_sink.lock().await;
5062 let _ = sink.send(Message::Text(error.into())).await;
5063 continue;
5064 }
5065 }
5066 }
5067
5068 // Injection scanning
5069 if !state.injection_disabled {
5070 let text_to_scan = extract_scannable_text(&json_val);
5071 if !text_to_scan.is_empty() {
5072 let injection_matches: Vec<String> =
5073 if let Some(ref scanner) = state.injection_scanner {
5074 scanner
5075 .inspect(&text_to_scan)
5076 .into_iter()
5077 .map(|s| s.to_string())
5078 .collect()
5079 } else {
5080 inspect_for_injection(&text_to_scan)
5081 .into_iter()
5082 .map(|s| s.to_string())
5083 .collect()
5084 };
5085
5086 if !injection_matches.is_empty() {
5087 injection_found = true;
5088 tracing::warn!(
5089 "SECURITY: Injection in WS response! Session: {}, Patterns: {:?}",
5090 session_id,
5091 injection_matches,
5092 );
5093
5094 let verdict = if state.injection_blocking {
5095 Verdict::Deny {
5096 reason: format!(
5097 "WS response injection blocked: {injection_matches:?}"
5098 ),
5099 }
5100 } else {
5101 Verdict::Allow
5102 };
5103
5104 let action = Action::new(
5105 "vellaveto",
5106 "ws_response_injection",
5107 json!({
5108 "matched_patterns": injection_matches,
5109 "session": session_id,
5110 "transport": "websocket",
5111 }),
5112 );
5113 let injection_security_context =
5114 super::helpers::response_injection_security_context(
5115 tracked_tool_name.as_deref(),
5116 &json_val,
5117 state.injection_blocking,
5118 "response_injection",
5119 );
5120 let envelope = build_secondary_acis_envelope_with_security_context(
5121 &action,
5122 &verdict,
5123 DecisionOrigin::InjectionScanner,
5124 "websocket",
5125 Some(&session_id),
5126 Some(&injection_security_context),
5127 );
5128 if let Err(e) = state
5129 .audit
5130 .log_entry_with_acis(
5131 &action,
5132 &verdict,
5133 json!({
5134 "source": "ws_proxy",
5135 "event": "ws_injection_detected",
5136 }),
5137 envelope,
5138 )
5139 .await
5140 {
5141 tracing::warn!("Failed to audit WS injection: {}", e);
5142 }
5143
5144 if state.injection_blocking {
5145 let id = json_val.get("id");
5146 let error = make_ws_error_response(
5147 id,
5148 -32001,
5149 "Response blocked: injection detected",
5150 );
5151 let mut sink = client_sink.lock().await;
5152 let _ = sink.send(Message::Text(error.into())).await;
5153 continue;
5154 }
5155 }
5156 }
5157 }
5158
5159 // SECURITY (FIND-R46-007): Rug-pull detection on tools/list responses.
5160 // Check if this is a response to a tools/list request and extract
5161 // annotations for rug-pull detection.
5162 if json_val.get("result").is_some() {
5163 // Check if result contains "tools" array (tools/list response)
5164 if json_val
5165 .get("result")
5166 .and_then(|r| r.get("tools"))
5167 .and_then(|t| t.as_array())
5168 .is_some()
5169 {
5170 super::helpers::extract_annotations_from_response(
5171 &json_val,
5172 &session_id,
5173 &state.sessions,
5174 &state.audit,
5175 &state.known_tools,
5176 )
5177 .await;
5178
5179 // Verify manifest if configured
5180 if let Some(ref manifest_config) = state.manifest_config {
5181 super::helpers::verify_manifest_from_response(
5182 &json_val,
5183 &session_id,
5184 &state.sessions,
5185 manifest_config,
5186 &state.audit,
5187 )
5188 .await;
5189 }
5190
5191 // SECURITY (FIND-R130-003): Scan tool descriptions for embedded
5192 // injection. Parity with HTTP upstream handler (upstream.rs:648-698).
5193 if !state.injection_disabled {
5194 let desc_findings =
5195 if let Some(ref scanner) = state.injection_scanner {
5196 scan_tool_descriptions_with_scanner(&json_val, scanner)
5197 } else {
5198 scan_tool_descriptions(&json_val)
5199 };
5200 for finding in &desc_findings {
5201 injection_found = true;
5202 tracing::warn!(
5203 "SECURITY: Injection in tool '{}' description! \
5204 Session: {}, Patterns: {:?}",
5205 finding.tool_name,
5206 session_id,
5207 finding.matched_patterns
5208 );
5209 let action = Action::new(
5210 "vellaveto",
5211 "tool_description_injection",
5212 json!({
5213 "tool": finding.tool_name,
5214 "matched_patterns": finding.matched_patterns,
5215 "session": session_id,
5216 "transport": "websocket",
5217 "blocking": state.injection_blocking,
5218 }),
5219 );
5220 let tool_desc_verdict = Verdict::Deny {
5221 reason: format!(
5222 "Tool '{}' description contains injection: {:?}",
5223 finding.tool_name, finding.matched_patterns
5224 ),
5225 };
5226 let desc_security_context =
5227 super::helpers::tool_discovery_integrity_security_context(
5228 &finding.tool_name,
5229 vellaveto_types::ContextChannel::CommandLike,
5230 "tool_description_injection",
5231 true,
5232 );
5233 let envelope =
5234 build_secondary_acis_envelope_with_security_context(
5235 &action,
5236 &tool_desc_verdict,
5237 DecisionOrigin::InjectionScanner,
5238 "websocket",
5239 Some(&session_id),
5240 Some(&desc_security_context),
5241 );
5242 if let Err(e) = state
5243 .audit
5244 .log_entry_with_acis(
5245 &action,
5246 &tool_desc_verdict,
5247 json!({
5248 "source": "ws_proxy",
5249 "event": "tool_description_injection",
5250 }),
5251 envelope,
5252 )
5253 .await
5254 {
5255 tracing::warn!(
5256 "Failed to audit WS tool description injection: {}",
5257 e
5258 );
5259 }
5260 }
5261 if !desc_findings.is_empty() && state.injection_blocking {
5262 let id = json_val.get("id");
5263 let error = make_ws_error_response(
5264 id,
5265 -32001,
5266 "Response blocked: suspicious content in tool descriptions",
5267 );
5268 let mut sink = client_sink.lock().await;
5269 let _ = sink.send(Message::Text(error.into())).await;
5270 continue;
5271 }
5272 }
5273 }
5274 }
5275
5276 // SECURITY: Enforce output schema on WS structuredContent.
5277 // SECURITY (FIND-R154-004): Track schema violations for the
5278 // record_response guard below, even in non-blocking mode.
5279 let schema_violation_found = validate_ws_structured_content_response(
5280 &json_val,
5281 &state,
5282 &session_id,
5283 tracked_tool_name.as_deref(),
5284 )
5285 .await;
5286 if schema_violation_found {
5287 let id = json_val.get("id");
5288 let error = make_ws_error_response(
5289 id,
5290 -32001,
5291 "Response blocked: output schema validation failed",
5292 );
5293 let mut sink = client_sink.lock().await;
5294 let _ = sink.send(Message::Text(error.into())).await;
5295 continue;
5296 }
5297
5298 // SECURITY (FIND-R75-003, FIND-R154-004): Record response
5299 // fingerprints for memory poisoning detection. Skip recording
5300 // when DLP, injection, or schema violation was detected (even
5301 // in log-only mode) to avoid poisoning the tracker with tainted
5302 // data. Parity with stdio relay (relay.rs:2919).
5303 if !dlp_found && !injection_found && !schema_violation_found {
5304 if let Some(mut session) = state.sessions.get_mut(&session_id) {
5305 session.memory_tracker.record_response(&json_val);
5306 }
5307 }
5308
5309 // SECURITY (FIND-R46-WS-004): Audit log forwarded upstream→client text messages
5310 {
5311 let msg_type = if json_val.get("result").is_some() {
5312 "response"
5313 } else if json_val.get("error").is_some() {
5314 "error_response"
5315 } else if json_val.get("method").is_some() {
5316 "notification"
5317 } else {
5318 "unknown"
5319 };
5320 let action = Action::new(
5321 "vellaveto",
5322 "ws_forward_upstream_message",
5323 json!({
5324 "message_type": msg_type,
5325 "session": session_id,
5326 "transport": "websocket",
5327 "direction": "upstream_to_client",
5328 }),
5329 );
5330 let message_security_context =
5331 super::helpers::protocol_message_forward_security_context(
5332 &json_val,
5333 "ws_upstream_message_forwarded",
5334 );
5335 let envelope = build_secondary_acis_envelope_with_security_context(
5336 &action,
5337 &Verdict::Allow,
5338 DecisionOrigin::PolicyEngine,
5339 "websocket",
5340 Some(&session_id),
5341 Some(&message_security_context),
5342 );
5343 if let Err(e) = state
5344 .audit
5345 .log_entry_with_acis(
5346 &action,
5347 &Verdict::Allow,
5348 json!({
5349 "source": "ws_proxy",
5350 "event": "ws_upstream_message_forwarded",
5351 }),
5352 envelope,
5353 )
5354 .await
5355 {
5356 tracing::warn!("Failed to audit WS upstream message forward: {}", e);
5357 }
5358 }
5359
5360 // SECURITY (FIND-R48-001): Fail-closed on canonicalization failure.
5361 if state.canonicalize {
5362 match serde_json::to_string(&json_val) {
5363 Ok(canonical) => canonical,
5364 Err(e) => {
5365 tracing::error!(
5366 "SECURITY: WS response canonicalization failed: {}",
5367 e
5368 );
5369 continue;
5370 }
5371 }
5372 } else {
5373 text.to_string()
5374 }
5375 } else {
5376 // SECURITY (FIND-R166-001): Non-JSON upstream text must still be
5377 // scanned for DLP/injection before forwarding. A malicious upstream
5378 // could exfiltrate secrets or inject payloads via non-JSON frames.
5379 let text_str: &str = &text;
5380 // SECURITY (FIND-R168-001): DLP scan with audit logging parity.
5381 if state.response_dlp_enabled {
5382 let findings = scan_text_for_secrets(text_str, "ws.upstream.non_json_text");
5383 if !findings.is_empty() {
5384 let patterns: Vec<String> = findings
5385 .iter()
5386 .map(|f| format!("{}:{}", f.pattern_name, f.location))
5387 .collect();
5388 tracing::warn!(
5389 session_id = %session_id,
5390 "DLP: non-JSON upstream text contains sensitive data ({} findings)",
5391 findings.len(),
5392 );
5393 let verdict = if state.response_dlp_blocking {
5394 Verdict::Deny {
5395 reason: format!("WS non-JSON DLP: {patterns:?}"),
5396 }
5397 } else {
5398 Verdict::Allow
5399 };
5400 let action = Action::new(
5401 "vellaveto",
5402 "ws_nonjson_dlp_scan",
5403 json!({ "findings": patterns, "session": session_id, "transport": "websocket" }),
5404 );
5405 // SECURITY (SE-004): Log audit failures instead of silently discarding.
5406 let text_security_context = super::helpers::text_dlp_security_context(
5407 text_str,
5408 state.response_dlp_blocking,
5409 "ws_nonjson_dlp",
5410 );
5411 let envelope = build_secondary_acis_envelope_with_security_context(
5412 &action,
5413 &verdict,
5414 DecisionOrigin::Dlp,
5415 "websocket",
5416 Some(&session_id),
5417 Some(&text_security_context),
5418 );
5419 if let Err(e) = state.audit.log_entry_with_acis(
5420 &action, &verdict,
5421 json!({ "source": "ws_proxy", "event": "ws_nonjson_dlp_alert" }),
5422 envelope,
5423 ).await {
5424 tracing::error!(
5425 session_id = %session_id,
5426 error = %e,
5427 "AUDIT FAILURE: failed to log ws_nonjson_dlp_alert"
5428 );
5429 }
5430 if state.response_dlp_blocking {
5431 continue;
5432 }
5433 }
5434 }
5435 // SECURITY (FIND-R168-002): Injection scan with log-only mode
5436 // parity. Always log detections; only block when injection_blocking.
5437 {
5438 let alerts: Vec<String> = if let Some(ref scanner) = state.injection_scanner
5439 {
5440 scanner
5441 .inspect(text_str)
5442 .into_iter()
5443 .map(|s| s.to_string())
5444 .collect()
5445 } else {
5446 inspect_for_injection(text_str)
5447 .into_iter()
5448 .map(|s| s.to_string())
5449 .collect()
5450 };
5451 if !alerts.is_empty() {
5452 tracing::warn!(
5453 session_id = %session_id,
5454 "Injection: non-JSON upstream text ({} alerts), blocking={}",
5455 alerts.len(), state.injection_blocking,
5456 );
5457 let verdict = if state.injection_blocking {
5458 Verdict::Deny {
5459 reason: format!(
5460 "WS non-JSON injection: {} alerts",
5461 alerts.len()
5462 ),
5463 }
5464 } else {
5465 Verdict::Allow
5466 };
5467 let action = Action::new(
5468 "vellaveto",
5469 "ws_nonjson_injection_scan",
5470 json!({ "alerts": alerts.len(), "session": session_id, "transport": "websocket" }),
5471 );
5472 // SECURITY (SE-004): Log audit failures instead of silently discarding.
5473 let text_security_context =
5474 super::helpers::text_injection_security_context(
5475 text_str,
5476 state.injection_blocking,
5477 "ws_nonjson_injection",
5478 );
5479 let envelope = build_secondary_acis_envelope_with_security_context(
5480 &action,
5481 &verdict,
5482 DecisionOrigin::InjectionScanner,
5483 "websocket",
5484 Some(&session_id),
5485 Some(&text_security_context),
5486 );
5487 if let Err(e) = state.audit.log_entry_with_acis(
5488 &action, &verdict,
5489 json!({ "source": "ws_proxy", "event": "ws_nonjson_injection_alert" }),
5490 envelope,
5491 ).await {
5492 tracing::error!(
5493 session_id = %session_id,
5494 error = %e,
5495 "AUDIT FAILURE: failed to log ws_nonjson_injection_alert"
5496 );
5497 }
5498 if state.injection_blocking {
5499 continue;
5500 }
5501 }
5502 }
5503 text.to_string()
5504 };
5505
5506 let mut sink = client_sink.lock().await;
5507 if let Err(e) = sink.send(Message::Text(forward.into())).await {
5508 tracing::debug!("Failed to send to client: {}", e);
5509 break;
5510 }
5511 }
5512 tokio_tungstenite::tungstenite::Message::Binary(data) => {
5513 // SECURITY (FIND-R46-WS-002): DLP scanning on upstream binary frames.
5514 // Binary from upstream is unusual for JSON-RPC but must be scanned
5515 // before being dropped, to detect and audit secret exfiltration attempts
5516 // via binary frames.
5517 tracing::warn!(
5518 session_id = %session_id,
5519 "Unexpected binary frame from upstream ({} bytes), scanning before drop",
5520 data.len(),
5521 );
5522
5523 // DLP scan the binary data as UTF-8 lossy
5524 if state.response_dlp_enabled {
5525 let text_repr = String::from_utf8_lossy(&data);
5526 if !text_repr.is_empty() {
5527 let dlp_findings = scan_text_for_secrets(&text_repr, "ws_binary_frame");
5528 if !dlp_findings.is_empty() {
5529 for finding in &dlp_findings {
5530 record_dlp_finding(&finding.pattern_name);
5531 }
5532 let patterns: Vec<String> = dlp_findings
5533 .iter()
5534 .map(|f| format!("{}:{}", f.pattern_name, f.location))
5535 .collect();
5536
5537 tracing::warn!(
5538 "SECURITY: Secrets in WS upstream binary frame! Session: {}, Findings: {:?}",
5539 session_id,
5540 patterns,
5541 );
5542
5543 let action = Action::new(
5544 "vellaveto",
5545 "ws_binary_dlp_scan",
5546 json!({
5547 "findings": patterns,
5548 "session": session_id,
5549 "transport": "websocket",
5550 "direction": "upstream_to_client",
5551 "binary_size": data.len(),
5552 }),
5553 );
5554 let binary_dlp_verdict = Verdict::Deny {
5555 reason: format!("WS binary frame DLP: {patterns:?}"),
5556 };
5557 let text_security_context = super::helpers::text_dlp_security_context(
5558 &text_repr,
5559 true,
5560 "ws_binary_dlp",
5561 );
5562 let envelope = build_secondary_acis_envelope_with_security_context(
5563 &action,
5564 &binary_dlp_verdict,
5565 DecisionOrigin::Dlp,
5566 "websocket",
5567 Some(&session_id),
5568 Some(&text_security_context),
5569 );
5570 if let Err(e) = state
5571 .audit
5572 .log_entry_with_acis(
5573 &action,
5574 &binary_dlp_verdict,
5575 json!({
5576 "source": "ws_proxy",
5577 "event": "ws_binary_dlp_alert",
5578 }),
5579 envelope,
5580 )
5581 .await
5582 {
5583 tracing::warn!("Failed to audit WS binary DLP: {}", e);
5584 }
5585 }
5586 }
5587 }
5588
5589 // SECURITY (FIND-R46-WS-004): Audit log binary frame drop
5590 let action = Action::new(
5591 "vellaveto",
5592 "ws_upstream_binary_dropped",
5593 json!({
5594 "session": session_id,
5595 "transport": "websocket",
5596 "direction": "upstream_to_client",
5597 "binary_size": data.len(),
5598 }),
5599 );
5600 let upstream_binary_verdict = Verdict::Deny {
5601 reason: "Binary frames not supported for JSON-RPC".to_string(),
5602 };
5603 let upstream_binary_security_context =
5604 super::helpers::protocol_binary_rejection_security_context(
5605 "ws_upstream_binary_dropped",
5606 );
5607 let envelope = build_secondary_acis_envelope_with_security_context(
5608 &action,
5609 &upstream_binary_verdict,
5610 DecisionOrigin::PolicyEngine,
5611 "websocket",
5612 Some(&session_id),
5613 Some(&upstream_binary_security_context),
5614 );
5615 if let Err(e) = state
5616 .audit
5617 .log_entry_with_acis(
5618 &action,
5619 &upstream_binary_verdict,
5620 json!({
5621 "source": "ws_proxy",
5622 "event": "ws_upstream_binary_dropped",
5623 }),
5624 envelope,
5625 )
5626 .await
5627 {
5628 tracing::warn!("Failed to audit WS upstream binary drop: {}", e);
5629 }
5630 }
5631 tokio_tungstenite::tungstenite::Message::Ping(data) => {
5632 // Forward ping as pong to upstream (handled by tungstenite)
5633 let _ = data; // tungstenite auto-responds to pings
5634 }
5635 tokio_tungstenite::tungstenite::Message::Pong(_) => {}
5636 tokio_tungstenite::tungstenite::Message::Close(_) => {
5637 tracing::debug!(session_id = %session_id, "Upstream sent close frame");
5638 break;
5639 }
5640 tokio_tungstenite::tungstenite::Message::Frame(_) => {
5641 // Raw frame — ignore
5642 }
5643 }
5644 }
5645}
5646
5647/// Convert an HTTP URL to a WebSocket URL.
5648///
5649/// `http://` → `ws://`, `https://` → `wss://`.
5650///
5651/// SECURITY (FIND-R124-001): Only allows http/https/ws/wss schemes.
5652/// Unknown schemes (ftp://, file://, gopher://) are rejected with a
5653/// warning and fall back to the original URL prefixed with `ws://`
5654/// to maintain fail-closed behavior. This gives parity with scheme
5655/// validation in HTTP and gRPC transports (FIND-R42-015).
5656pub fn convert_to_ws_url(http_url: &str) -> String {
5657 if let Some(rest) = http_url.strip_prefix("https://") {
5658 format!("wss://{rest}")
5659 } else if let Some(rest) = http_url.strip_prefix("http://") {
5660 format!("ws://{rest}")
5661 } else if http_url.starts_with("wss://") || http_url.starts_with("ws://") {
5662 // Already a WebSocket URL — use as-is
5663 http_url.to_string()
5664 } else {
5665 // SECURITY (FIND-R124-001): Reject unknown schemes. Log warning
5666 // and return a URL that will fail to connect safely rather than
5667 // connecting to an unintended scheme (e.g., ftp://, file://).
5668 // SECURITY (FIND-R166-003): Sanitize logged value to prevent log injection
5669 // from URLs with control characters (possible in gateway mode).
5670 tracing::warn!(
5671 "convert_to_ws_url: rejecting URL with unsupported scheme: {}",
5672 vellaveto_types::sanitize_for_log(
5673 http_url.split("://").next().unwrap_or("unknown"),
5674 128,
5675 )
5676 );
5677 // Return invalid URL that will fail at connect_async()
5678 format!("ws://invalid-scheme-rejected.localhost/{}", http_url.len())
5679 }
5680}
5681
5682/// Connect to an upstream WebSocket server.
5683///
5684/// Returns the split WebSocket stream or an error.
5685async fn connect_upstream_ws(
5686 url: &str,
5687) -> Result<
5688 tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
5689 String,
5690> {
5691 let connect_timeout = Duration::from_secs(10);
5692 match tokio::time::timeout(connect_timeout, tokio_tungstenite::connect_async(url)).await {
5693 Ok(Ok((ws_stream, _response))) => Ok(ws_stream),
5694 Ok(Err(e)) => Err(format!("WebSocket connection error: {e}")),
5695 Err(_) => Err("WebSocket connection timeout (10s)".to_string()),
5696 }
5697}
5698
5699/// Register output schemas and validate WS response `structuredContent`.
5700///
5701/// Returns true when the response should be blocked.
5702async fn validate_ws_structured_content_response(
5703 json_val: &Value,
5704 state: &ProxyState,
5705 session_id: &str,
5706 tracked_tool_name: Option<&str>,
5707) -> bool {
5708 // Keep WS behavior aligned with HTTP/SSE paths.
5709 state
5710 .output_schema_registry
5711 .register_from_tools_list(json_val);
5712
5713 let Some(result) = json_val.get("result") else {
5714 return false;
5715 };
5716 let Some(structured) = result.get("structuredContent") else {
5717 return false;
5718 };
5719
5720 let meta_tool_name = result
5721 .get("_meta")
5722 .and_then(|m| m.get("tool"))
5723 .and_then(|t| t.as_str());
5724 let tool_name = match (meta_tool_name, tracked_tool_name) {
5725 (Some(meta), Some(tracked)) if !meta.eq_ignore_ascii_case(tracked) => {
5726 tracing::warn!(
5727 "SECURITY: WS structuredContent tool mismatch (meta='{}', tracked='{}'); using tracked tool name",
5728 meta,
5729 tracked
5730 );
5731 tracked
5732 }
5733 (Some(meta), _) => meta,
5734 (None, Some(tracked)) => tracked,
5735 (None, None) => "unknown",
5736 };
5737
5738 match state.output_schema_registry.validate(tool_name, structured) {
5739 ValidationResult::Invalid { violations } => {
5740 tracing::warn!(
5741 "SECURITY: WS structuredContent validation failed for tool '{}': {:?}",
5742 tool_name,
5743 violations
5744 );
5745 let action = Action::new(
5746 "vellaveto",
5747 "output_schema_violation",
5748 json!({
5749 "tool": tool_name,
5750 "violations": violations,
5751 "session": session_id,
5752 "transport": "websocket",
5753 }),
5754 );
5755 let schema_verdict = Verdict::Deny {
5756 reason: format!("WS structuredContent validation failed: {violations:?}"),
5757 };
5758 let schema_security_context =
5759 super::helpers::output_schema_violation_security_context(Some(tool_name), true);
5760 let envelope = build_secondary_acis_envelope_with_security_context(
5761 &action,
5762 &schema_verdict,
5763 DecisionOrigin::PolicyEngine,
5764 "websocket",
5765 Some(session_id),
5766 Some(&schema_security_context),
5767 );
5768 if let Err(e) = state
5769 .audit
5770 .log_entry_with_acis(
5771 &action,
5772 &schema_verdict,
5773 json!({"source": "ws_proxy", "event": "output_schema_violation_ws"}),
5774 envelope,
5775 )
5776 .await
5777 {
5778 tracing::warn!("Failed to audit WS output schema violation: {}", e);
5779 }
5780 true
5781 }
5782 ValidationResult::Valid => {
5783 tracing::debug!("WS structuredContent validated for tool '{}'", tool_name);
5784 false
5785 }
5786 ValidationResult::NoSchema => {
5787 tracing::debug!(
5788 "No output schema registered for WS tool '{}', skipping validation",
5789 tool_name
5790 );
5791 false
5792 }
5793 }
5794}
5795
5796// NOTE: build_ws_evaluation_context() was removed in FIND-R130-002 fix.
5797// All callers now build EvaluationContext inline inside the DashMap shard
5798// lock to prevent TOCTOU races on call_counts/action_history.
5799
5800/// Check per-connection rate limit. Returns true if within limit.
5801fn check_rate_limit(
5802 counter: &AtomicU64,
5803 window_start: &std::sync::Mutex<std::time::Instant>,
5804 max_per_sec: u32,
5805) -> bool {
5806 // SECURITY (FIND-R182-006): Fail-closed — zero rate limit blocks all messages.
5807 // Previously returned true (fail-open), which disabled rate limiting entirely.
5808 if max_per_sec == 0 {
5809 return false;
5810 }
5811
5812 let now = std::time::Instant::now();
5813 let mut start = match window_start.lock() {
5814 Ok(guard) => guard,
5815 Err(e) => {
5816 tracing::error!("WS rate limiter mutex poisoned — fail-closed: {}", e);
5817 return false;
5818 }
5819 };
5820
5821 if now.duration_since(*start) >= Duration::from_secs(1) {
5822 // Reset window
5823 *start = now;
5824 // SECURITY (FIND-R55-WS-003): Use SeqCst for security-critical rate limit counter.
5825 counter.store(1, Ordering::SeqCst);
5826 true
5827 } else {
5828 // SECURITY (FIND-R182-003): saturating arithmetic prevents overflow wrap-to-zero.
5829 // SECURITY (FIND-R155-WS-001): Conditional atomic increment — only increment if
5830 // within limit, reject otherwise. This eliminates the TOCTOU gap between
5831 // load()+compare and also prevents counter inflation from rejected requests.
5832 // The closure returns None when limit is reached, causing fetch_update to fail
5833 // without modifying the counter.
5834 let limit = max_per_sec as u64;
5835 match counter.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| {
5836 if v >= limit {
5837 None // Limit reached — do not increment
5838 } else {
5839 Some(v.saturating_add(1))
5840 }
5841 }) {
5842 Ok(_prev) => true, // Within limit, counter was incremented
5843 Err(_) => false, // Limit exceeded, counter unchanged
5844 }
5845 }
5846}
5847
5848/// Extract scannable text from a JSON-RPC request for injection scanning.
5849///
5850/// SECURITY (FIND-R46-WS-001): Scans tool call arguments, resource URIs,
5851/// and sampling request content for injection payloads in the client→upstream
5852/// direction. Matches the HTTP proxy's request-side injection scanning.
5853fn extract_scannable_text_from_request(json_val: &Value) -> String {
5854 let mut text_parts = Vec::new();
5855
5856 // SECURITY (FIND-R224-002): Recursively scan the entire `params` subtree,
5857 // not just specific fields. Previous narrow extraction missed injection
5858 // payloads in TaskRequest (params.message), ExtensionMethod, and other
5859 // message types with non-standard parameter structures. This gives parity
5860 // with the HTTP handler's `extract_passthrough_text_for_injection` which
5861 // scans all of params recursively.
5862 if let Some(params) = json_val.get("params") {
5863 extract_strings_recursive(params, &mut text_parts, 0);
5864 }
5865
5866 // SECURITY (FIND-R224-006): Also scan `result` field for injection payloads.
5867 // JSON-RPC response messages (sampling/elicitation replies) carry data in
5868 // `result` rather than `params`. Without this, upstream injection payloads
5869 // in response results bypass WS scanning while being caught by HTTP/gRPC.
5870 if let Some(result) = json_val.get("result") {
5871 extract_strings_recursive(result, &mut text_parts, 0);
5872 }
5873
5874 text_parts.join("\n")
5875}
5876
5877/// Recursively extract string values from a JSON value, with depth and count bounds.
5878///
5879/// SECURITY (FIND-R48-007): Added MAX_PARTS to prevent memory amplification
5880/// from messages containing many short strings.
5881fn extract_strings_recursive(val: &Value, parts: &mut Vec<String>, depth: usize) {
5882 // SECURITY (FIND-R154-005): Use depth 32 matching shared MAX_SCAN_DEPTH
5883 // in scanner_base.rs. Previous limit of 10 allowed injection payloads
5884 // nested between depth 11-32 to evade WS scanning while being caught
5885 // by the stdio relay and DLP scanner (both use MAX_SCAN_DEPTH=32).
5886 const MAX_DEPTH: usize = 32;
5887 const MAX_PARTS: usize = 1000;
5888 if depth > MAX_DEPTH || parts.len() >= MAX_PARTS {
5889 return;
5890 }
5891 match val {
5892 Value::String(s) => parts.push(s.clone()),
5893 Value::Array(arr) => {
5894 for item in arr {
5895 extract_strings_recursive(item, parts, depth + 1);
5896 }
5897 }
5898 Value::Object(map) => {
5899 for (key, v) in map {
5900 // SECURITY (FIND-R154-003): Also scan object keys for injection
5901 // payloads. Parity with stdio relay's traverse_json_strings_with_keys.
5902 // Without this, attackers can hide injection in JSON key names.
5903 if parts.len() < MAX_PARTS {
5904 parts.push(key.clone());
5905 }
5906 extract_strings_recursive(v, parts, depth + 1);
5907 }
5908 }
5909 _ => {}
5910 }
5911}
5912
5913/// Extract scannable text from a JSON-RPC response for injection scanning.
5914///
5915/// SECURITY (FIND-R130-004): Delegates to the shared `extract_text_from_result()`
5916/// which covers `resource.text`, `resource.blob` (base64-decoded), `annotations`,
5917/// and `_meta` — all missing from the previous WS-only implementation.
5918fn extract_scannable_text(json_val: &Value) -> String {
5919 let mut text_parts = Vec::new();
5920
5921 // Scan result via shared extraction (covers content[].text, resource.text,
5922 // resource.blob, annotations, instructionsForUser, structuredContent, _meta).
5923 if let Some(result) = json_val.get("result") {
5924 let result_text = super::inspection::extract_text_from_result(result);
5925 if !result_text.is_empty() {
5926 text_parts.push(result_text);
5927 }
5928 }
5929
5930 // Scan error messages (not covered by extract_text_from_result)
5931 if let Some(error) = json_val.get("error") {
5932 if let Some(msg) = error.get("message").and_then(|m| m.as_str()) {
5933 text_parts.push(msg.to_string());
5934 }
5935 // SECURITY (FIND-R168-005): Use as_str() first to avoid wrapping
5936 // string values in JSON quotes. Parity with scanner_base.rs.
5937 if let Some(data) = error.get("data") {
5938 if let Some(s) = data.as_str() {
5939 text_parts.push(s.to_string());
5940 } else {
5941 text_parts.push(data.to_string());
5942 }
5943 }
5944 }
5945
5946 text_parts.join("\n")
5947}
5948
5949/// Test-only compatibility helper for approval-store assertions.
5950#[cfg(test)]
5951async fn create_ws_approval(
5952 state: &ProxyState,
5953 session_id: &str,
5954 action: &Action,
5955 reason: &str,
5956) -> Option<String> {
5957 let store = state.approval_store.as_ref()?;
5958 let requested_by = state.sessions.get_mut(session_id).and_then(|session| {
5959 session
5960 .agent_identity
5961 .as_ref()
5962 .and_then(|identity| identity.subject.clone())
5963 .or_else(|| session.oauth_subject.clone())
5964 });
5965 match store
5966 .create(
5967 action.clone(),
5968 reason.to_string(),
5969 requested_by,
5970 Some(session_id.to_string()),
5971 Some(vellaveto_engine::acis::fingerprint_action(action)),
5972 )
5973 .await
5974 {
5975 Ok(id) => Some(id),
5976 Err(e) => {
5977 tracing::error!(
5978 session_id = %session_id,
5979 "Failed to create WebSocket approval (fail-closed): {}",
5980 e
5981 );
5982 None
5983 }
5984 }
5985}
5986
5987/// Build a JSON-RPC error response string for WebSocket with optional `error.data`.
5988fn make_ws_error_response_with_data(
5989 id: Option<&Value>,
5990 code: i64,
5991 message: &str,
5992 data: Option<Value>,
5993) -> String {
5994 let mut error = serde_json::Map::new();
5995 error.insert("code".to_string(), Value::from(code));
5996 error.insert("message".to_string(), Value::from(message));
5997 if let Some(data) = data {
5998 error.insert("data".to_string(), data);
5999 }
6000 let response = json!({
6001 "jsonrpc": "2.0",
6002 "id": id.cloned().unwrap_or(Value::Null),
6003 "error": Value::Object(error),
6004 });
6005 serde_json::to_string(&response).unwrap_or_else(|_| {
6006 format!(r#"{{"jsonrpc":"2.0","error":{{"code":{code},"message":"{message}"}},"id":null}}"#)
6007 })
6008}
6009
6010/// Build a JSON-RPC error response string for WebSocket.
6011fn make_ws_error_response(id: Option<&Value>, code: i64, message: &str) -> String {
6012 make_ws_error_response_with_data(id, code, message, None)
6013}
6014
6015#[cfg(test)]
6016mod tests;