sentinel_agent_protocol/
protocol.rs

1//! Agent protocol types and constants.
2//!
3//! This module defines the wire protocol types for communication between
4//! the proxy dataplane and external processing agents.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// Agent protocol version
10pub const PROTOCOL_VERSION: u32 = 1;
11
12/// Maximum message size (10MB)
13pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
14
15/// Agent event type
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
17#[serde(rename_all = "snake_case")]
18pub enum EventType {
19    /// Agent configuration (sent once when agent connects)
20    Configure,
21    /// Request headers received
22    RequestHeaders,
23    /// Request body chunk received
24    RequestBodyChunk,
25    /// Response headers received
26    ResponseHeaders,
27    /// Response body chunk received
28    ResponseBodyChunk,
29    /// Request/response complete (for logging)
30    RequestComplete,
31    /// WebSocket frame received (after upgrade)
32    WebSocketFrame,
33}
34
35/// Agent decision
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
37#[serde(rename_all = "snake_case")]
38pub enum Decision {
39    /// Allow the request/response to continue
40    #[default]
41    Allow,
42    /// Block the request/response
43    Block {
44        /// HTTP status code to return
45        status: u16,
46        /// Optional response body
47        body: Option<String>,
48        /// Optional response headers
49        headers: Option<HashMap<String, String>>,
50    },
51    /// Redirect the request
52    Redirect {
53        /// Redirect URL
54        url: String,
55        /// HTTP status code (301, 302, 303, 307, 308)
56        status: u16,
57    },
58    /// Challenge the client (e.g., CAPTCHA)
59    Challenge {
60        /// Challenge type
61        challenge_type: String,
62        /// Challenge parameters
63        params: HashMap<String, String>,
64    },
65}
66
67/// Header modification operation
68#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(rename_all = "snake_case")]
70pub enum HeaderOp {
71    /// Set a header (replace if exists)
72    Set { name: String, value: String },
73    /// Add a header (append if exists)
74    Add { name: String, value: String },
75    /// Remove a header
76    Remove { name: String },
77}
78
79// ============================================================================
80// Body Mutation
81// ============================================================================
82
83/// Body mutation from agent
84///
85/// Allows agents to modify body content during streaming:
86/// - `None` data: pass through original chunk unchanged
87/// - `Some(empty)`: drop the chunk entirely
88/// - `Some(data)`: replace chunk with modified content
89#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
90pub struct BodyMutation {
91    /// Modified body data (base64 encoded for JSON transport)
92    ///
93    /// - `None`: use original chunk unchanged
94    /// - `Some("")`: drop this chunk
95    /// - `Some(data)`: replace chunk with this data
96    pub data: Option<String>,
97
98    /// Chunk index this mutation applies to
99    ///
100    /// Must match the `chunk_index` from the body chunk event.
101    #[serde(default)]
102    pub chunk_index: u32,
103}
104
105impl BodyMutation {
106    /// Create a pass-through mutation (no change)
107    pub fn pass_through(chunk_index: u32) -> Self {
108        Self {
109            data: None,
110            chunk_index,
111        }
112    }
113
114    /// Create a mutation that drops the chunk
115    pub fn drop_chunk(chunk_index: u32) -> Self {
116        Self {
117            data: Some(String::new()),
118            chunk_index,
119        }
120    }
121
122    /// Create a mutation that replaces the chunk
123    pub fn replace(chunk_index: u32, data: String) -> Self {
124        Self {
125            data: Some(data),
126            chunk_index,
127        }
128    }
129
130    /// Check if this mutation passes through unchanged
131    pub fn is_pass_through(&self) -> bool {
132        self.data.is_none()
133    }
134
135    /// Check if this mutation drops the chunk
136    pub fn is_drop(&self) -> bool {
137        matches!(&self.data, Some(d) if d.is_empty())
138    }
139}
140
141/// Request metadata sent to agents
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct RequestMetadata {
144    /// Correlation ID for request tracing
145    pub correlation_id: String,
146    /// Request ID (internal)
147    pub request_id: String,
148    /// Client IP address
149    pub client_ip: String,
150    /// Client port
151    pub client_port: u16,
152    /// Server name (SNI or Host header)
153    pub server_name: Option<String>,
154    /// Protocol (HTTP/1.1, HTTP/2, etc.)
155    pub protocol: String,
156    /// TLS version if applicable
157    pub tls_version: Option<String>,
158    /// TLS cipher suite if applicable
159    pub tls_cipher: Option<String>,
160    /// Route ID that matched
161    pub route_id: Option<String>,
162    /// Upstream ID
163    pub upstream_id: Option<String>,
164    /// Request start timestamp (RFC3339)
165    pub timestamp: String,
166}
167
168/// Configure event
169///
170/// Sent once when an agent connects, before any request events.
171/// Contains agent-specific configuration from the proxy's config file.
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct ConfigureEvent {
174    /// Agent ID (from proxy config)
175    pub agent_id: String,
176    /// Agent-specific configuration (JSON object)
177    ///
178    /// The structure of this config depends on the agent type.
179    /// Agents should parse this into their own config struct.
180    pub config: serde_json::Value,
181}
182
183/// Request headers event
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct RequestHeadersEvent {
186    /// Event metadata
187    pub metadata: RequestMetadata,
188    /// HTTP method
189    pub method: String,
190    /// Request URI
191    pub uri: String,
192    /// HTTP headers
193    pub headers: HashMap<String, Vec<String>>,
194}
195
196/// Request body chunk event
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct RequestBodyChunkEvent {
199    /// Correlation ID
200    pub correlation_id: String,
201    /// Body chunk data (base64 encoded for JSON transport)
202    pub data: String,
203    /// Is this the last chunk?
204    pub is_last: bool,
205    /// Total body size if known
206    pub total_size: Option<usize>,
207    /// Chunk index for ordering (0-based)
208    ///
209    /// Used to match mutations to chunks and ensure ordering.
210    #[serde(default)]
211    pub chunk_index: u32,
212    /// Bytes received so far (cumulative)
213    #[serde(default)]
214    pub bytes_received: usize,
215}
216
217/// Response headers event
218#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct ResponseHeadersEvent {
220    /// Correlation ID
221    pub correlation_id: String,
222    /// HTTP status code
223    pub status: u16,
224    /// HTTP headers
225    pub headers: HashMap<String, Vec<String>>,
226}
227
228/// Response body chunk event
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct ResponseBodyChunkEvent {
231    /// Correlation ID
232    pub correlation_id: String,
233    /// Body chunk data (base64 encoded for JSON transport)
234    pub data: String,
235    /// Is this the last chunk?
236    pub is_last: bool,
237    /// Total body size if known
238    pub total_size: Option<usize>,
239    /// Chunk index for ordering (0-based)
240    #[serde(default)]
241    pub chunk_index: u32,
242    /// Bytes sent so far (cumulative)
243    #[serde(default)]
244    pub bytes_sent: usize,
245}
246
247/// Request complete event (for logging/audit)
248#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct RequestCompleteEvent {
250    /// Correlation ID
251    pub correlation_id: String,
252    /// Final HTTP status code
253    pub status: u16,
254    /// Request duration in milliseconds
255    pub duration_ms: u64,
256    /// Request body size
257    pub request_body_size: usize,
258    /// Response body size
259    pub response_body_size: usize,
260    /// Upstream attempts
261    pub upstream_attempts: u32,
262    /// Error if any
263    pub error: Option<String>,
264}
265
266// ============================================================================
267// WebSocket Frame Events
268// ============================================================================
269
270/// WebSocket frame event
271///
272/// Sent to agents after a WebSocket upgrade when frame inspection is enabled.
273/// Each frame is sent individually for inspection.
274#[derive(Debug, Clone, Serialize, Deserialize)]
275pub struct WebSocketFrameEvent {
276    /// Correlation ID (same as the original HTTP upgrade request)
277    pub correlation_id: String,
278    /// Frame opcode: "text", "binary", "ping", "pong", "close", "continuation"
279    pub opcode: String,
280    /// Frame payload (base64 encoded for JSON transport)
281    pub data: String,
282    /// Direction: true = client->server, false = server->client
283    pub client_to_server: bool,
284    /// Frame index for this connection (0-based, per direction)
285    pub frame_index: u64,
286    /// FIN bit - true if final frame of message (for fragmented messages)
287    pub fin: bool,
288    /// Route ID
289    pub route_id: Option<String>,
290    /// Client IP
291    pub client_ip: String,
292}
293
294/// WebSocket opcode
295#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
296#[serde(rename_all = "snake_case")]
297pub enum WebSocketOpcode {
298    /// Continuation frame (0x0)
299    Continuation,
300    /// Text frame (0x1)
301    Text,
302    /// Binary frame (0x2)
303    Binary,
304    /// Connection close (0x8)
305    Close,
306    /// Ping (0x9)
307    Ping,
308    /// Pong (0xA)
309    Pong,
310}
311
312impl WebSocketOpcode {
313    /// Convert opcode to string representation
314    pub fn as_str(&self) -> &'static str {
315        match self {
316            Self::Continuation => "continuation",
317            Self::Text => "text",
318            Self::Binary => "binary",
319            Self::Close => "close",
320            Self::Ping => "ping",
321            Self::Pong => "pong",
322        }
323    }
324
325    /// Parse from byte value
326    pub fn from_u8(value: u8) -> Option<Self> {
327        match value {
328            0x0 => Some(Self::Continuation),
329            0x1 => Some(Self::Text),
330            0x2 => Some(Self::Binary),
331            0x8 => Some(Self::Close),
332            0x9 => Some(Self::Ping),
333            0xA => Some(Self::Pong),
334            _ => None,
335        }
336    }
337
338    /// Convert to byte value
339    pub fn as_u8(&self) -> u8 {
340        match self {
341            Self::Continuation => 0x0,
342            Self::Text => 0x1,
343            Self::Binary => 0x2,
344            Self::Close => 0x8,
345            Self::Ping => 0x9,
346            Self::Pong => 0xA,
347        }
348    }
349}
350
351/// WebSocket frame decision
352///
353/// Agents return this decision for WebSocket frame events.
354#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
355#[serde(rename_all = "snake_case")]
356pub enum WebSocketDecision {
357    /// Allow frame to pass through
358    #[default]
359    Allow,
360    /// Drop this frame silently (don't forward)
361    Drop,
362    /// Close the WebSocket connection
363    Close {
364        /// Close code (RFC 6455 section 7.4.1)
365        code: u16,
366        /// Close reason
367        reason: String,
368    },
369}
370
371/// Agent request message
372#[derive(Debug, Clone, Serialize, Deserialize)]
373pub struct AgentRequest {
374    /// Protocol version
375    pub version: u32,
376    /// Event type
377    pub event_type: EventType,
378    /// Event payload (JSON)
379    pub payload: serde_json::Value,
380}
381
382/// Agent response message
383#[derive(Debug, Clone, Serialize, Deserialize)]
384pub struct AgentResponse {
385    /// Protocol version
386    pub version: u32,
387    /// Decision
388    pub decision: Decision,
389    /// Header modifications for request
390    #[serde(default)]
391    pub request_headers: Vec<HeaderOp>,
392    /// Header modifications for response
393    #[serde(default)]
394    pub response_headers: Vec<HeaderOp>,
395    /// Routing metadata modifications
396    #[serde(default)]
397    pub routing_metadata: HashMap<String, String>,
398    /// Audit metadata
399    #[serde(default)]
400    pub audit: AuditMetadata,
401
402    // ========================================================================
403    // Streaming-specific fields
404    // ========================================================================
405    /// Agent needs more data to make a final decision
406    ///
407    /// When `true`, the current `decision` is provisional and may change
408    /// after processing more body chunks. The proxy should continue
409    /// streaming body data to this agent.
410    ///
411    /// When `false` (default), the decision is final.
412    #[serde(default)]
413    pub needs_more: bool,
414
415    /// Request body mutation (for streaming mode)
416    ///
417    /// If present, applies the mutation to the current request body chunk.
418    /// Only valid for `RequestBodyChunk` events.
419    #[serde(default)]
420    pub request_body_mutation: Option<BodyMutation>,
421
422    /// Response body mutation (for streaming mode)
423    ///
424    /// If present, applies the mutation to the current response body chunk.
425    /// Only valid for `ResponseBodyChunk` events.
426    #[serde(default)]
427    pub response_body_mutation: Option<BodyMutation>,
428
429    /// WebSocket frame decision
430    ///
431    /// Only valid for `WebSocketFrame` events. If not set, defaults to Allow.
432    #[serde(default)]
433    pub websocket_decision: Option<WebSocketDecision>,
434}
435
436impl AgentResponse {
437    /// Create a default allow response
438    pub fn default_allow() -> Self {
439        Self {
440            version: PROTOCOL_VERSION,
441            decision: Decision::Allow,
442            request_headers: vec![],
443            response_headers: vec![],
444            routing_metadata: HashMap::new(),
445            audit: AuditMetadata::default(),
446            needs_more: false,
447            request_body_mutation: None,
448            response_body_mutation: None,
449            websocket_decision: None,
450        }
451    }
452
453    /// Create a block response
454    pub fn block(status: u16, body: Option<String>) -> Self {
455        Self {
456            version: PROTOCOL_VERSION,
457            decision: Decision::Block {
458                status,
459                body,
460                headers: None,
461            },
462            request_headers: vec![],
463            response_headers: vec![],
464            routing_metadata: HashMap::new(),
465            audit: AuditMetadata::default(),
466            needs_more: false,
467            request_body_mutation: None,
468            response_body_mutation: None,
469            websocket_decision: None,
470        }
471    }
472
473    /// Create a redirect response
474    pub fn redirect(url: String, status: u16) -> Self {
475        Self {
476            version: PROTOCOL_VERSION,
477            decision: Decision::Redirect { url, status },
478            request_headers: vec![],
479            response_headers: vec![],
480            routing_metadata: HashMap::new(),
481            audit: AuditMetadata::default(),
482            needs_more: false,
483            request_body_mutation: None,
484            response_body_mutation: None,
485            websocket_decision: None,
486        }
487    }
488
489    /// Create a streaming response indicating more data is needed
490    pub fn needs_more_data() -> Self {
491        Self {
492            version: PROTOCOL_VERSION,
493            decision: Decision::Allow,
494            request_headers: vec![],
495            response_headers: vec![],
496            routing_metadata: HashMap::new(),
497            audit: AuditMetadata::default(),
498            needs_more: true,
499            request_body_mutation: None,
500            response_body_mutation: None,
501            websocket_decision: None,
502        }
503    }
504
505    /// Create a WebSocket allow response
506    pub fn websocket_allow() -> Self {
507        Self {
508            websocket_decision: Some(WebSocketDecision::Allow),
509            ..Self::default_allow()
510        }
511    }
512
513    /// Create a WebSocket drop response (drop the frame, don't forward)
514    pub fn websocket_drop() -> Self {
515        Self {
516            websocket_decision: Some(WebSocketDecision::Drop),
517            ..Self::default_allow()
518        }
519    }
520
521    /// Create a WebSocket close response (close the connection)
522    pub fn websocket_close(code: u16, reason: String) -> Self {
523        Self {
524            websocket_decision: Some(WebSocketDecision::Close { code, reason }),
525            ..Self::default_allow()
526        }
527    }
528
529    /// Set WebSocket decision
530    pub fn with_websocket_decision(mut self, decision: WebSocketDecision) -> Self {
531        self.websocket_decision = Some(decision);
532        self
533    }
534
535    /// Create a streaming response with body mutation
536    pub fn with_request_body_mutation(mut self, mutation: BodyMutation) -> Self {
537        self.request_body_mutation = Some(mutation);
538        self
539    }
540
541    /// Create a streaming response with response body mutation
542    pub fn with_response_body_mutation(mut self, mutation: BodyMutation) -> Self {
543        self.response_body_mutation = Some(mutation);
544        self
545    }
546
547    /// Set needs_more flag
548    pub fn set_needs_more(mut self, needs_more: bool) -> Self {
549        self.needs_more = needs_more;
550        self
551    }
552
553    /// Add a request header modification
554    pub fn add_request_header(mut self, op: HeaderOp) -> Self {
555        self.request_headers.push(op);
556        self
557    }
558
559    /// Add a response header modification
560    pub fn add_response_header(mut self, op: HeaderOp) -> Self {
561        self.response_headers.push(op);
562        self
563    }
564
565    /// Add audit metadata
566    pub fn with_audit(mut self, audit: AuditMetadata) -> Self {
567        self.audit = audit;
568        self
569    }
570}
571
572/// Audit metadata from agent
573#[derive(Debug, Clone, Default, Serialize, Deserialize)]
574pub struct AuditMetadata {
575    /// Tags for logging/metrics
576    #[serde(default)]
577    pub tags: Vec<String>,
578    /// Rule IDs that matched
579    #[serde(default)]
580    pub rule_ids: Vec<String>,
581    /// Confidence score (0.0 - 1.0)
582    pub confidence: Option<f32>,
583    /// Reason codes
584    #[serde(default)]
585    pub reason_codes: Vec<String>,
586    /// Custom metadata
587    #[serde(default)]
588    pub custom: HashMap<String, serde_json::Value>,
589}