sentinel_agent_protocol/
protocol.rs

1//! Agent protocol types and constants.
2//!
3//! This module defines the wire protocol types for communication between
4//! the proxy dataplane and external processing agents.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// Agent protocol version
10pub const PROTOCOL_VERSION: u32 = 1;
11
12/// Maximum message size (10MB)
13pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
14
15/// Agent event type
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
17#[serde(rename_all = "snake_case")]
18pub enum EventType {
19    /// Agent configuration (sent once when agent connects)
20    Configure,
21    /// Request headers received
22    RequestHeaders,
23    /// Request body chunk received
24    RequestBodyChunk,
25    /// Response headers received
26    ResponseHeaders,
27    /// Response body chunk received
28    ResponseBodyChunk,
29    /// Request/response complete (for logging)
30    RequestComplete,
31    /// WebSocket frame received (after upgrade)
32    WebSocketFrame,
33}
34
35/// Agent decision
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
37#[serde(rename_all = "snake_case")]
38pub enum Decision {
39    /// Allow the request/response to continue
40    #[default]
41    Allow,
42    /// Block the request/response
43    Block {
44        /// HTTP status code to return
45        status: u16,
46        /// Optional response body
47        body: Option<String>,
48        /// Optional response headers
49        headers: Option<HashMap<String, String>>,
50    },
51    /// Redirect the request
52    Redirect {
53        /// Redirect URL
54        url: String,
55        /// HTTP status code (301, 302, 303, 307, 308)
56        status: u16,
57    },
58    /// Challenge the client (e.g., CAPTCHA)
59    Challenge {
60        /// Challenge type
61        challenge_type: String,
62        /// Challenge parameters
63        params: HashMap<String, String>,
64    },
65}
66
67/// Header modification operation
68#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(rename_all = "snake_case")]
70pub enum HeaderOp {
71    /// Set a header (replace if exists)
72    Set { name: String, value: String },
73    /// Add a header (append if exists)
74    Add { name: String, value: String },
75    /// Remove a header
76    Remove { name: String },
77}
78
79// ============================================================================
80// Body Mutation
81// ============================================================================
82
83/// Body mutation from agent
84///
85/// Allows agents to modify body content during streaming:
86/// - `None` data: pass through original chunk unchanged
87/// - `Some(empty)`: drop the chunk entirely
88/// - `Some(data)`: replace chunk with modified content
89#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
90pub struct BodyMutation {
91    /// Modified body data (base64 encoded for JSON transport)
92    ///
93    /// - `None`: use original chunk unchanged
94    /// - `Some("")`: drop this chunk
95    /// - `Some(data)`: replace chunk with this data
96    pub data: Option<String>,
97
98    /// Chunk index this mutation applies to
99    ///
100    /// Must match the `chunk_index` from the body chunk event.
101    #[serde(default)]
102    pub chunk_index: u32,
103}
104
105impl BodyMutation {
106    /// Create a pass-through mutation (no change)
107    pub fn pass_through(chunk_index: u32) -> Self {
108        Self {
109            data: None,
110            chunk_index,
111        }
112    }
113
114    /// Create a mutation that drops the chunk
115    pub fn drop_chunk(chunk_index: u32) -> Self {
116        Self {
117            data: Some(String::new()),
118            chunk_index,
119        }
120    }
121
122    /// Create a mutation that replaces the chunk
123    pub fn replace(chunk_index: u32, data: String) -> Self {
124        Self {
125            data: Some(data),
126            chunk_index,
127        }
128    }
129
130    /// Check if this mutation passes through unchanged
131    pub fn is_pass_through(&self) -> bool {
132        self.data.is_none()
133    }
134
135    /// Check if this mutation drops the chunk
136    pub fn is_drop(&self) -> bool {
137        matches!(&self.data, Some(d) if d.is_empty())
138    }
139}
140
141/// Request metadata sent to agents
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct RequestMetadata {
144    /// Correlation ID for request tracing
145    pub correlation_id: String,
146    /// Request ID (internal)
147    pub request_id: String,
148    /// Client IP address
149    pub client_ip: String,
150    /// Client port
151    pub client_port: u16,
152    /// Server name (SNI or Host header)
153    pub server_name: Option<String>,
154    /// Protocol (HTTP/1.1, HTTP/2, etc.)
155    pub protocol: String,
156    /// TLS version if applicable
157    pub tls_version: Option<String>,
158    /// TLS cipher suite if applicable
159    pub tls_cipher: Option<String>,
160    /// Route ID that matched
161    pub route_id: Option<String>,
162    /// Upstream ID
163    pub upstream_id: Option<String>,
164    /// Request start timestamp (RFC3339)
165    pub timestamp: String,
166    /// W3C Trace Context traceparent header (for distributed tracing)
167    ///
168    /// Format: `{version}-{trace-id}-{parent-id}-{trace-flags}`
169    /// Example: `00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01`
170    ///
171    /// Agents can use this to create child spans that link to the proxy's span.
172    #[serde(skip_serializing_if = "Option::is_none")]
173    pub traceparent: Option<String>,
174}
175
176/// Configure event
177///
178/// Sent once when an agent connects, before any request events.
179/// Contains agent-specific configuration from the proxy's config file.
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct ConfigureEvent {
182    /// Agent ID (from proxy config)
183    pub agent_id: String,
184    /// Agent-specific configuration (JSON object)
185    ///
186    /// The structure of this config depends on the agent type.
187    /// Agents should parse this into their own config struct.
188    pub config: serde_json::Value,
189}
190
191/// Request headers event
192#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct RequestHeadersEvent {
194    /// Event metadata
195    pub metadata: RequestMetadata,
196    /// HTTP method
197    pub method: String,
198    /// Request URI
199    pub uri: String,
200    /// HTTP headers
201    pub headers: HashMap<String, Vec<String>>,
202}
203
204/// Request body chunk event
205#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct RequestBodyChunkEvent {
207    /// Correlation ID
208    pub correlation_id: String,
209    /// Body chunk data (base64 encoded for JSON transport)
210    pub data: String,
211    /// Is this the last chunk?
212    pub is_last: bool,
213    /// Total body size if known
214    pub total_size: Option<usize>,
215    /// Chunk index for ordering (0-based)
216    ///
217    /// Used to match mutations to chunks and ensure ordering.
218    #[serde(default)]
219    pub chunk_index: u32,
220    /// Bytes received so far (cumulative)
221    #[serde(default)]
222    pub bytes_received: usize,
223}
224
225/// Response headers event
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct ResponseHeadersEvent {
228    /// Correlation ID
229    pub correlation_id: String,
230    /// HTTP status code
231    pub status: u16,
232    /// HTTP headers
233    pub headers: HashMap<String, Vec<String>>,
234}
235
236/// Response body chunk event
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct ResponseBodyChunkEvent {
239    /// Correlation ID
240    pub correlation_id: String,
241    /// Body chunk data (base64 encoded for JSON transport)
242    pub data: String,
243    /// Is this the last chunk?
244    pub is_last: bool,
245    /// Total body size if known
246    pub total_size: Option<usize>,
247    /// Chunk index for ordering (0-based)
248    #[serde(default)]
249    pub chunk_index: u32,
250    /// Bytes sent so far (cumulative)
251    #[serde(default)]
252    pub bytes_sent: usize,
253}
254
255/// Request complete event (for logging/audit)
256#[derive(Debug, Clone, Serialize, Deserialize)]
257pub struct RequestCompleteEvent {
258    /// Correlation ID
259    pub correlation_id: String,
260    /// Final HTTP status code
261    pub status: u16,
262    /// Request duration in milliseconds
263    pub duration_ms: u64,
264    /// Request body size
265    pub request_body_size: usize,
266    /// Response body size
267    pub response_body_size: usize,
268    /// Upstream attempts
269    pub upstream_attempts: u32,
270    /// Error if any
271    pub error: Option<String>,
272}
273
274// ============================================================================
275// WebSocket Frame Events
276// ============================================================================
277
278/// WebSocket frame event
279///
280/// Sent to agents after a WebSocket upgrade when frame inspection is enabled.
281/// Each frame is sent individually for inspection.
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct WebSocketFrameEvent {
284    /// Correlation ID (same as the original HTTP upgrade request)
285    pub correlation_id: String,
286    /// Frame opcode: "text", "binary", "ping", "pong", "close", "continuation"
287    pub opcode: String,
288    /// Frame payload (base64 encoded for JSON transport)
289    pub data: String,
290    /// Direction: true = client->server, false = server->client
291    pub client_to_server: bool,
292    /// Frame index for this connection (0-based, per direction)
293    pub frame_index: u64,
294    /// FIN bit - true if final frame of message (for fragmented messages)
295    pub fin: bool,
296    /// Route ID
297    pub route_id: Option<String>,
298    /// Client IP
299    pub client_ip: String,
300}
301
302/// WebSocket opcode
303#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
304#[serde(rename_all = "snake_case")]
305pub enum WebSocketOpcode {
306    /// Continuation frame (0x0)
307    Continuation,
308    /// Text frame (0x1)
309    Text,
310    /// Binary frame (0x2)
311    Binary,
312    /// Connection close (0x8)
313    Close,
314    /// Ping (0x9)
315    Ping,
316    /// Pong (0xA)
317    Pong,
318}
319
320impl WebSocketOpcode {
321    /// Convert opcode to string representation
322    pub fn as_str(&self) -> &'static str {
323        match self {
324            Self::Continuation => "continuation",
325            Self::Text => "text",
326            Self::Binary => "binary",
327            Self::Close => "close",
328            Self::Ping => "ping",
329            Self::Pong => "pong",
330        }
331    }
332
333    /// Parse from byte value
334    pub fn from_u8(value: u8) -> Option<Self> {
335        match value {
336            0x0 => Some(Self::Continuation),
337            0x1 => Some(Self::Text),
338            0x2 => Some(Self::Binary),
339            0x8 => Some(Self::Close),
340            0x9 => Some(Self::Ping),
341            0xA => Some(Self::Pong),
342            _ => None,
343        }
344    }
345
346    /// Convert to byte value
347    pub fn as_u8(&self) -> u8 {
348        match self {
349            Self::Continuation => 0x0,
350            Self::Text => 0x1,
351            Self::Binary => 0x2,
352            Self::Close => 0x8,
353            Self::Ping => 0x9,
354            Self::Pong => 0xA,
355        }
356    }
357}
358
359/// WebSocket frame decision
360///
361/// Agents return this decision for WebSocket frame events.
362#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
363#[serde(rename_all = "snake_case")]
364pub enum WebSocketDecision {
365    /// Allow frame to pass through
366    #[default]
367    Allow,
368    /// Drop this frame silently (don't forward)
369    Drop,
370    /// Close the WebSocket connection
371    Close {
372        /// Close code (RFC 6455 section 7.4.1)
373        code: u16,
374        /// Close reason
375        reason: String,
376    },
377}
378
379/// Agent request message
380#[derive(Debug, Clone, Serialize, Deserialize)]
381pub struct AgentRequest {
382    /// Protocol version
383    pub version: u32,
384    /// Event type
385    pub event_type: EventType,
386    /// Event payload (JSON)
387    pub payload: serde_json::Value,
388}
389
390/// Agent response message
391#[derive(Debug, Clone, Serialize, Deserialize)]
392pub struct AgentResponse {
393    /// Protocol version
394    pub version: u32,
395    /// Decision
396    pub decision: Decision,
397    /// Header modifications for request
398    #[serde(default)]
399    pub request_headers: Vec<HeaderOp>,
400    /// Header modifications for response
401    #[serde(default)]
402    pub response_headers: Vec<HeaderOp>,
403    /// Routing metadata modifications
404    #[serde(default)]
405    pub routing_metadata: HashMap<String, String>,
406    /// Audit metadata
407    #[serde(default)]
408    pub audit: AuditMetadata,
409
410    // ========================================================================
411    // Streaming-specific fields
412    // ========================================================================
413    /// Agent needs more data to make a final decision
414    ///
415    /// When `true`, the current `decision` is provisional and may change
416    /// after processing more body chunks. The proxy should continue
417    /// streaming body data to this agent.
418    ///
419    /// When `false` (default), the decision is final.
420    #[serde(default)]
421    pub needs_more: bool,
422
423    /// Request body mutation (for streaming mode)
424    ///
425    /// If present, applies the mutation to the current request body chunk.
426    /// Only valid for `RequestBodyChunk` events.
427    #[serde(default)]
428    pub request_body_mutation: Option<BodyMutation>,
429
430    /// Response body mutation (for streaming mode)
431    ///
432    /// If present, applies the mutation to the current response body chunk.
433    /// Only valid for `ResponseBodyChunk` events.
434    #[serde(default)]
435    pub response_body_mutation: Option<BodyMutation>,
436
437    /// WebSocket frame decision
438    ///
439    /// Only valid for `WebSocketFrame` events. If not set, defaults to Allow.
440    #[serde(default)]
441    pub websocket_decision: Option<WebSocketDecision>,
442}
443
444impl AgentResponse {
445    /// Create a default allow response
446    pub fn default_allow() -> Self {
447        Self {
448            version: PROTOCOL_VERSION,
449            decision: Decision::Allow,
450            request_headers: vec![],
451            response_headers: vec![],
452            routing_metadata: HashMap::new(),
453            audit: AuditMetadata::default(),
454            needs_more: false,
455            request_body_mutation: None,
456            response_body_mutation: None,
457            websocket_decision: None,
458        }
459    }
460
461    /// Create a block response
462    pub fn block(status: u16, body: Option<String>) -> Self {
463        Self {
464            version: PROTOCOL_VERSION,
465            decision: Decision::Block {
466                status,
467                body,
468                headers: None,
469            },
470            request_headers: vec![],
471            response_headers: vec![],
472            routing_metadata: HashMap::new(),
473            audit: AuditMetadata::default(),
474            needs_more: false,
475            request_body_mutation: None,
476            response_body_mutation: None,
477            websocket_decision: None,
478        }
479    }
480
481    /// Create a redirect response
482    pub fn redirect(url: String, status: u16) -> Self {
483        Self {
484            version: PROTOCOL_VERSION,
485            decision: Decision::Redirect { url, status },
486            request_headers: vec![],
487            response_headers: vec![],
488            routing_metadata: HashMap::new(),
489            audit: AuditMetadata::default(),
490            needs_more: false,
491            request_body_mutation: None,
492            response_body_mutation: None,
493            websocket_decision: None,
494        }
495    }
496
497    /// Create a streaming response indicating more data is needed
498    pub fn needs_more_data() -> Self {
499        Self {
500            version: PROTOCOL_VERSION,
501            decision: Decision::Allow,
502            request_headers: vec![],
503            response_headers: vec![],
504            routing_metadata: HashMap::new(),
505            audit: AuditMetadata::default(),
506            needs_more: true,
507            request_body_mutation: None,
508            response_body_mutation: None,
509            websocket_decision: None,
510        }
511    }
512
513    /// Create a WebSocket allow response
514    pub fn websocket_allow() -> Self {
515        Self {
516            websocket_decision: Some(WebSocketDecision::Allow),
517            ..Self::default_allow()
518        }
519    }
520
521    /// Create a WebSocket drop response (drop the frame, don't forward)
522    pub fn websocket_drop() -> Self {
523        Self {
524            websocket_decision: Some(WebSocketDecision::Drop),
525            ..Self::default_allow()
526        }
527    }
528
529    /// Create a WebSocket close response (close the connection)
530    pub fn websocket_close(code: u16, reason: String) -> Self {
531        Self {
532            websocket_decision: Some(WebSocketDecision::Close { code, reason }),
533            ..Self::default_allow()
534        }
535    }
536
537    /// Set WebSocket decision
538    pub fn with_websocket_decision(mut self, decision: WebSocketDecision) -> Self {
539        self.websocket_decision = Some(decision);
540        self
541    }
542
543    /// Create a streaming response with body mutation
544    pub fn with_request_body_mutation(mut self, mutation: BodyMutation) -> Self {
545        self.request_body_mutation = Some(mutation);
546        self
547    }
548
549    /// Create a streaming response with response body mutation
550    pub fn with_response_body_mutation(mut self, mutation: BodyMutation) -> Self {
551        self.response_body_mutation = Some(mutation);
552        self
553    }
554
555    /// Set needs_more flag
556    pub fn set_needs_more(mut self, needs_more: bool) -> Self {
557        self.needs_more = needs_more;
558        self
559    }
560
561    /// Add a request header modification
562    pub fn add_request_header(mut self, op: HeaderOp) -> Self {
563        self.request_headers.push(op);
564        self
565    }
566
567    /// Add a response header modification
568    pub fn add_response_header(mut self, op: HeaderOp) -> Self {
569        self.response_headers.push(op);
570        self
571    }
572
573    /// Add audit metadata
574    pub fn with_audit(mut self, audit: AuditMetadata) -> Self {
575        self.audit = audit;
576        self
577    }
578}
579
580/// Audit metadata from agent
581#[derive(Debug, Clone, Default, Serialize, Deserialize)]
582pub struct AuditMetadata {
583    /// Tags for logging/metrics
584    #[serde(default)]
585    pub tags: Vec<String>,
586    /// Rule IDs that matched
587    #[serde(default)]
588    pub rule_ids: Vec<String>,
589    /// Confidence score (0.0 - 1.0)
590    pub confidence: Option<f32>,
591    /// Reason codes
592    #[serde(default)]
593    pub reason_codes: Vec<String>,
594    /// Custom metadata
595    #[serde(default)]
596    pub custom: HashMap<String, serde_json::Value>,
597}