sentinel_agent_protocol/
protocol.rs

1//! Agent protocol types and constants.
2//!
3//! This module defines the wire protocol types for communication between
4//! the proxy dataplane and external processing agents.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// Agent protocol version
10pub const PROTOCOL_VERSION: u32 = 1;
11
12/// Maximum message size (10MB)
13pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
14
15/// Agent event type
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
17#[serde(rename_all = "snake_case")]
18pub enum EventType {
19    /// Request headers received
20    RequestHeaders,
21    /// Request body chunk received
22    RequestBodyChunk,
23    /// Response headers received
24    ResponseHeaders,
25    /// Response body chunk received
26    ResponseBodyChunk,
27    /// Request/response complete (for logging)
28    RequestComplete,
29    /// WebSocket frame received (after upgrade)
30    WebSocketFrame,
31}
32
33/// Agent decision
34#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
35#[serde(rename_all = "snake_case")]
36pub enum Decision {
37    /// Allow the request/response to continue
38    #[default]
39    Allow,
40    /// Block the request/response
41    Block {
42        /// HTTP status code to return
43        status: u16,
44        /// Optional response body
45        body: Option<String>,
46        /// Optional response headers
47        headers: Option<HashMap<String, String>>,
48    },
49    /// Redirect the request
50    Redirect {
51        /// Redirect URL
52        url: String,
53        /// HTTP status code (301, 302, 303, 307, 308)
54        status: u16,
55    },
56    /// Challenge the client (e.g., CAPTCHA)
57    Challenge {
58        /// Challenge type
59        challenge_type: String,
60        /// Challenge parameters
61        params: HashMap<String, String>,
62    },
63}
64
65/// Header modification operation
66#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
67#[serde(rename_all = "snake_case")]
68pub enum HeaderOp {
69    /// Set a header (replace if exists)
70    Set { name: String, value: String },
71    /// Add a header (append if exists)
72    Add { name: String, value: String },
73    /// Remove a header
74    Remove { name: String },
75}
76
77// ============================================================================
78// Body Mutation
79// ============================================================================
80
81/// Body mutation from agent
82///
83/// Allows agents to modify body content during streaming:
84/// - `None` data: pass through original chunk unchanged
85/// - `Some(empty)`: drop the chunk entirely
86/// - `Some(data)`: replace chunk with modified content
87#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
88pub struct BodyMutation {
89    /// Modified body data (base64 encoded for JSON transport)
90    ///
91    /// - `None`: use original chunk unchanged
92    /// - `Some("")`: drop this chunk
93    /// - `Some(data)`: replace chunk with this data
94    pub data: Option<String>,
95
96    /// Chunk index this mutation applies to
97    ///
98    /// Must match the `chunk_index` from the body chunk event.
99    #[serde(default)]
100    pub chunk_index: u32,
101}
102
103impl BodyMutation {
104    /// Create a pass-through mutation (no change)
105    pub fn pass_through(chunk_index: u32) -> Self {
106        Self {
107            data: None,
108            chunk_index,
109        }
110    }
111
112    /// Create a mutation that drops the chunk
113    pub fn drop_chunk(chunk_index: u32) -> Self {
114        Self {
115            data: Some(String::new()),
116            chunk_index,
117        }
118    }
119
120    /// Create a mutation that replaces the chunk
121    pub fn replace(chunk_index: u32, data: String) -> Self {
122        Self {
123            data: Some(data),
124            chunk_index,
125        }
126    }
127
128    /// Check if this mutation passes through unchanged
129    pub fn is_pass_through(&self) -> bool {
130        self.data.is_none()
131    }
132
133    /// Check if this mutation drops the chunk
134    pub fn is_drop(&self) -> bool {
135        matches!(&self.data, Some(d) if d.is_empty())
136    }
137}
138
139/// Request metadata sent to agents
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct RequestMetadata {
142    /// Correlation ID for request tracing
143    pub correlation_id: String,
144    /// Request ID (internal)
145    pub request_id: String,
146    /// Client IP address
147    pub client_ip: String,
148    /// Client port
149    pub client_port: u16,
150    /// Server name (SNI or Host header)
151    pub server_name: Option<String>,
152    /// Protocol (HTTP/1.1, HTTP/2, etc.)
153    pub protocol: String,
154    /// TLS version if applicable
155    pub tls_version: Option<String>,
156    /// TLS cipher suite if applicable
157    pub tls_cipher: Option<String>,
158    /// Route ID that matched
159    pub route_id: Option<String>,
160    /// Upstream ID
161    pub upstream_id: Option<String>,
162    /// Request start timestamp (RFC3339)
163    pub timestamp: String,
164}
165
166/// Request headers event
167#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct RequestHeadersEvent {
169    /// Event metadata
170    pub metadata: RequestMetadata,
171    /// HTTP method
172    pub method: String,
173    /// Request URI
174    pub uri: String,
175    /// HTTP headers
176    pub headers: HashMap<String, Vec<String>>,
177}
178
179/// Request body chunk event
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct RequestBodyChunkEvent {
182    /// Correlation ID
183    pub correlation_id: String,
184    /// Body chunk data (base64 encoded for JSON transport)
185    pub data: String,
186    /// Is this the last chunk?
187    pub is_last: bool,
188    /// Total body size if known
189    pub total_size: Option<usize>,
190    /// Chunk index for ordering (0-based)
191    ///
192    /// Used to match mutations to chunks and ensure ordering.
193    #[serde(default)]
194    pub chunk_index: u32,
195    /// Bytes received so far (cumulative)
196    #[serde(default)]
197    pub bytes_received: usize,
198}
199
200/// Response headers event
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct ResponseHeadersEvent {
203    /// Correlation ID
204    pub correlation_id: String,
205    /// HTTP status code
206    pub status: u16,
207    /// HTTP headers
208    pub headers: HashMap<String, Vec<String>>,
209}
210
211/// Response body chunk event
212#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct ResponseBodyChunkEvent {
214    /// Correlation ID
215    pub correlation_id: String,
216    /// Body chunk data (base64 encoded for JSON transport)
217    pub data: String,
218    /// Is this the last chunk?
219    pub is_last: bool,
220    /// Total body size if known
221    pub total_size: Option<usize>,
222    /// Chunk index for ordering (0-based)
223    #[serde(default)]
224    pub chunk_index: u32,
225    /// Bytes sent so far (cumulative)
226    #[serde(default)]
227    pub bytes_sent: usize,
228}
229
230/// Request complete event (for logging/audit)
231#[derive(Debug, Clone, Serialize, Deserialize)]
232pub struct RequestCompleteEvent {
233    /// Correlation ID
234    pub correlation_id: String,
235    /// Final HTTP status code
236    pub status: u16,
237    /// Request duration in milliseconds
238    pub duration_ms: u64,
239    /// Request body size
240    pub request_body_size: usize,
241    /// Response body size
242    pub response_body_size: usize,
243    /// Upstream attempts
244    pub upstream_attempts: u32,
245    /// Error if any
246    pub error: Option<String>,
247}
248
249// ============================================================================
250// WebSocket Frame Events
251// ============================================================================
252
253/// WebSocket frame event
254///
255/// Sent to agents after a WebSocket upgrade when frame inspection is enabled.
256/// Each frame is sent individually for inspection.
257#[derive(Debug, Clone, Serialize, Deserialize)]
258pub struct WebSocketFrameEvent {
259    /// Correlation ID (same as the original HTTP upgrade request)
260    pub correlation_id: String,
261    /// Frame opcode: "text", "binary", "ping", "pong", "close", "continuation"
262    pub opcode: String,
263    /// Frame payload (base64 encoded for JSON transport)
264    pub data: String,
265    /// Direction: true = client->server, false = server->client
266    pub client_to_server: bool,
267    /// Frame index for this connection (0-based, per direction)
268    pub frame_index: u64,
269    /// FIN bit - true if final frame of message (for fragmented messages)
270    pub fin: bool,
271    /// Route ID
272    pub route_id: Option<String>,
273    /// Client IP
274    pub client_ip: String,
275}
276
277/// WebSocket opcode
278#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
279#[serde(rename_all = "snake_case")]
280pub enum WebSocketOpcode {
281    /// Continuation frame (0x0)
282    Continuation,
283    /// Text frame (0x1)
284    Text,
285    /// Binary frame (0x2)
286    Binary,
287    /// Connection close (0x8)
288    Close,
289    /// Ping (0x9)
290    Ping,
291    /// Pong (0xA)
292    Pong,
293}
294
295impl WebSocketOpcode {
296    /// Convert opcode to string representation
297    pub fn as_str(&self) -> &'static str {
298        match self {
299            Self::Continuation => "continuation",
300            Self::Text => "text",
301            Self::Binary => "binary",
302            Self::Close => "close",
303            Self::Ping => "ping",
304            Self::Pong => "pong",
305        }
306    }
307
308    /// Parse from byte value
309    pub fn from_u8(value: u8) -> Option<Self> {
310        match value {
311            0x0 => Some(Self::Continuation),
312            0x1 => Some(Self::Text),
313            0x2 => Some(Self::Binary),
314            0x8 => Some(Self::Close),
315            0x9 => Some(Self::Ping),
316            0xA => Some(Self::Pong),
317            _ => None,
318        }
319    }
320
321    /// Convert to byte value
322    pub fn as_u8(&self) -> u8 {
323        match self {
324            Self::Continuation => 0x0,
325            Self::Text => 0x1,
326            Self::Binary => 0x2,
327            Self::Close => 0x8,
328            Self::Ping => 0x9,
329            Self::Pong => 0xA,
330        }
331    }
332}
333
334/// WebSocket frame decision
335///
336/// Agents return this decision for WebSocket frame events.
337#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
338#[serde(rename_all = "snake_case")]
339pub enum WebSocketDecision {
340    /// Allow frame to pass through
341    #[default]
342    Allow,
343    /// Drop this frame silently (don't forward)
344    Drop,
345    /// Close the WebSocket connection
346    Close {
347        /// Close code (RFC 6455 section 7.4.1)
348        code: u16,
349        /// Close reason
350        reason: String,
351    },
352}
353
354/// Agent request message
355#[derive(Debug, Clone, Serialize, Deserialize)]
356pub struct AgentRequest {
357    /// Protocol version
358    pub version: u32,
359    /// Event type
360    pub event_type: EventType,
361    /// Event payload (JSON)
362    pub payload: serde_json::Value,
363}
364
365/// Agent response message
366#[derive(Debug, Clone, Serialize, Deserialize)]
367pub struct AgentResponse {
368    /// Protocol version
369    pub version: u32,
370    /// Decision
371    pub decision: Decision,
372    /// Header modifications for request
373    #[serde(default)]
374    pub request_headers: Vec<HeaderOp>,
375    /// Header modifications for response
376    #[serde(default)]
377    pub response_headers: Vec<HeaderOp>,
378    /// Routing metadata modifications
379    #[serde(default)]
380    pub routing_metadata: HashMap<String, String>,
381    /// Audit metadata
382    #[serde(default)]
383    pub audit: AuditMetadata,
384
385    // ========================================================================
386    // Streaming-specific fields
387    // ========================================================================
388    /// Agent needs more data to make a final decision
389    ///
390    /// When `true`, the current `decision` is provisional and may change
391    /// after processing more body chunks. The proxy should continue
392    /// streaming body data to this agent.
393    ///
394    /// When `false` (default), the decision is final.
395    #[serde(default)]
396    pub needs_more: bool,
397
398    /// Request body mutation (for streaming mode)
399    ///
400    /// If present, applies the mutation to the current request body chunk.
401    /// Only valid for `RequestBodyChunk` events.
402    #[serde(default)]
403    pub request_body_mutation: Option<BodyMutation>,
404
405    /// Response body mutation (for streaming mode)
406    ///
407    /// If present, applies the mutation to the current response body chunk.
408    /// Only valid for `ResponseBodyChunk` events.
409    #[serde(default)]
410    pub response_body_mutation: Option<BodyMutation>,
411
412    /// WebSocket frame decision
413    ///
414    /// Only valid for `WebSocketFrame` events. If not set, defaults to Allow.
415    #[serde(default)]
416    pub websocket_decision: Option<WebSocketDecision>,
417}
418
419impl AgentResponse {
420    /// Create a default allow response
421    pub fn default_allow() -> Self {
422        Self {
423            version: PROTOCOL_VERSION,
424            decision: Decision::Allow,
425            request_headers: vec![],
426            response_headers: vec![],
427            routing_metadata: HashMap::new(),
428            audit: AuditMetadata::default(),
429            needs_more: false,
430            request_body_mutation: None,
431            response_body_mutation: None,
432            websocket_decision: None,
433        }
434    }
435
436    /// Create a block response
437    pub fn block(status: u16, body: Option<String>) -> Self {
438        Self {
439            version: PROTOCOL_VERSION,
440            decision: Decision::Block {
441                status,
442                body,
443                headers: None,
444            },
445            request_headers: vec![],
446            response_headers: vec![],
447            routing_metadata: HashMap::new(),
448            audit: AuditMetadata::default(),
449            needs_more: false,
450            request_body_mutation: None,
451            response_body_mutation: None,
452            websocket_decision: None,
453        }
454    }
455
456    /// Create a redirect response
457    pub fn redirect(url: String, status: u16) -> Self {
458        Self {
459            version: PROTOCOL_VERSION,
460            decision: Decision::Redirect { url, status },
461            request_headers: vec![],
462            response_headers: vec![],
463            routing_metadata: HashMap::new(),
464            audit: AuditMetadata::default(),
465            needs_more: false,
466            request_body_mutation: None,
467            response_body_mutation: None,
468            websocket_decision: None,
469        }
470    }
471
472    /// Create a streaming response indicating more data is needed
473    pub fn needs_more_data() -> Self {
474        Self {
475            version: PROTOCOL_VERSION,
476            decision: Decision::Allow,
477            request_headers: vec![],
478            response_headers: vec![],
479            routing_metadata: HashMap::new(),
480            audit: AuditMetadata::default(),
481            needs_more: true,
482            request_body_mutation: None,
483            response_body_mutation: None,
484            websocket_decision: None,
485        }
486    }
487
488    /// Create a WebSocket allow response
489    pub fn websocket_allow() -> Self {
490        Self {
491            websocket_decision: Some(WebSocketDecision::Allow),
492            ..Self::default_allow()
493        }
494    }
495
496    /// Create a WebSocket drop response (drop the frame, don't forward)
497    pub fn websocket_drop() -> Self {
498        Self {
499            websocket_decision: Some(WebSocketDecision::Drop),
500            ..Self::default_allow()
501        }
502    }
503
504    /// Create a WebSocket close response (close the connection)
505    pub fn websocket_close(code: u16, reason: String) -> Self {
506        Self {
507            websocket_decision: Some(WebSocketDecision::Close { code, reason }),
508            ..Self::default_allow()
509        }
510    }
511
512    /// Set WebSocket decision
513    pub fn with_websocket_decision(mut self, decision: WebSocketDecision) -> Self {
514        self.websocket_decision = Some(decision);
515        self
516    }
517
518    /// Create a streaming response with body mutation
519    pub fn with_request_body_mutation(mut self, mutation: BodyMutation) -> Self {
520        self.request_body_mutation = Some(mutation);
521        self
522    }
523
524    /// Create a streaming response with response body mutation
525    pub fn with_response_body_mutation(mut self, mutation: BodyMutation) -> Self {
526        self.response_body_mutation = Some(mutation);
527        self
528    }
529
530    /// Set needs_more flag
531    pub fn set_needs_more(mut self, needs_more: bool) -> Self {
532        self.needs_more = needs_more;
533        self
534    }
535
536    /// Add a request header modification
537    pub fn add_request_header(mut self, op: HeaderOp) -> Self {
538        self.request_headers.push(op);
539        self
540    }
541
542    /// Add a response header modification
543    pub fn add_response_header(mut self, op: HeaderOp) -> Self {
544        self.response_headers.push(op);
545        self
546    }
547
548    /// Add audit metadata
549    pub fn with_audit(mut self, audit: AuditMetadata) -> Self {
550        self.audit = audit;
551        self
552    }
553}
554
555/// Audit metadata from agent
556#[derive(Debug, Clone, Default, Serialize, Deserialize)]
557pub struct AuditMetadata {
558    /// Tags for logging/metrics
559    #[serde(default)]
560    pub tags: Vec<String>,
561    /// Rule IDs that matched
562    #[serde(default)]
563    pub rule_ids: Vec<String>,
564    /// Confidence score (0.0 - 1.0)
565    pub confidence: Option<f32>,
566    /// Reason codes
567    #[serde(default)]
568    pub reason_codes: Vec<String>,
569    /// Custom metadata
570    #[serde(default)]
571    pub custom: HashMap<String, serde_json::Value>,
572}