sentinel_agent_protocol/protocol.rs
1//! Agent protocol types and constants.
2//!
3//! This module defines the wire protocol types for communication between
4//! the proxy dataplane and external processing agents.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// Agent protocol version
10pub const PROTOCOL_VERSION: u32 = 1;
11
12/// Maximum message size (10MB)
13pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
14
15/// Agent event type
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
17#[serde(rename_all = "snake_case")]
18pub enum EventType {
19 /// Agent configuration (sent once when agent connects)
20 Configure,
21 /// Request headers received
22 RequestHeaders,
23 /// Request body chunk received
24 RequestBodyChunk,
25 /// Response headers received
26 ResponseHeaders,
27 /// Response body chunk received
28 ResponseBodyChunk,
29 /// Request/response complete (for logging)
30 RequestComplete,
31 /// WebSocket frame received (after upgrade)
32 WebSocketFrame,
33}
34
35/// Agent decision
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
37#[serde(rename_all = "snake_case")]
38pub enum Decision {
39 /// Allow the request/response to continue
40 #[default]
41 Allow,
42 /// Block the request/response
43 Block {
44 /// HTTP status code to return
45 status: u16,
46 /// Optional response body
47 body: Option<String>,
48 /// Optional response headers
49 headers: Option<HashMap<String, String>>,
50 },
51 /// Redirect the request
52 Redirect {
53 /// Redirect URL
54 url: String,
55 /// HTTP status code (301, 302, 303, 307, 308)
56 status: u16,
57 },
58 /// Challenge the client (e.g., CAPTCHA)
59 Challenge {
60 /// Challenge type
61 challenge_type: String,
62 /// Challenge parameters
63 params: HashMap<String, String>,
64 },
65}
66
67/// Header modification operation
68#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(rename_all = "snake_case")]
70pub enum HeaderOp {
71 /// Set a header (replace if exists)
72 Set { name: String, value: String },
73 /// Add a header (append if exists)
74 Add { name: String, value: String },
75 /// Remove a header
76 Remove { name: String },
77}
78
79// ============================================================================
80// Body Mutation
81// ============================================================================
82
83/// Body mutation from agent
84///
85/// Allows agents to modify body content during streaming:
86/// - `None` data: pass through original chunk unchanged
87/// - `Some(empty)`: drop the chunk entirely
88/// - `Some(data)`: replace chunk with modified content
89#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
90pub struct BodyMutation {
91 /// Modified body data (base64 encoded for JSON transport)
92 ///
93 /// - `None`: use original chunk unchanged
94 /// - `Some("")`: drop this chunk
95 /// - `Some(data)`: replace chunk with this data
96 pub data: Option<String>,
97
98 /// Chunk index this mutation applies to
99 ///
100 /// Must match the `chunk_index` from the body chunk event.
101 #[serde(default)]
102 pub chunk_index: u32,
103}
104
105impl BodyMutation {
106 /// Create a pass-through mutation (no change)
107 pub fn pass_through(chunk_index: u32) -> Self {
108 Self {
109 data: None,
110 chunk_index,
111 }
112 }
113
114 /// Create a mutation that drops the chunk
115 pub fn drop_chunk(chunk_index: u32) -> Self {
116 Self {
117 data: Some(String::new()),
118 chunk_index,
119 }
120 }
121
122 /// Create a mutation that replaces the chunk
123 pub fn replace(chunk_index: u32, data: String) -> Self {
124 Self {
125 data: Some(data),
126 chunk_index,
127 }
128 }
129
130 /// Check if this mutation passes through unchanged
131 pub fn is_pass_through(&self) -> bool {
132 self.data.is_none()
133 }
134
135 /// Check if this mutation drops the chunk
136 pub fn is_drop(&self) -> bool {
137 matches!(&self.data, Some(d) if d.is_empty())
138 }
139}
140
141/// Request metadata sent to agents
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct RequestMetadata {
144 /// Correlation ID for request tracing
145 pub correlation_id: String,
146 /// Request ID (internal)
147 pub request_id: String,
148 /// Client IP address
149 pub client_ip: String,
150 /// Client port
151 pub client_port: u16,
152 /// Server name (SNI or Host header)
153 pub server_name: Option<String>,
154 /// Protocol (HTTP/1.1, HTTP/2, etc.)
155 pub protocol: String,
156 /// TLS version if applicable
157 pub tls_version: Option<String>,
158 /// TLS cipher suite if applicable
159 pub tls_cipher: Option<String>,
160 /// Route ID that matched
161 pub route_id: Option<String>,
162 /// Upstream ID
163 pub upstream_id: Option<String>,
164 /// Request start timestamp (RFC3339)
165 pub timestamp: String,
166}
167
168/// Configure event
169///
170/// Sent once when an agent connects, before any request events.
171/// Contains agent-specific configuration from the proxy's config file.
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct ConfigureEvent {
174 /// Agent ID (from proxy config)
175 pub agent_id: String,
176 /// Agent-specific configuration (JSON object)
177 ///
178 /// The structure of this config depends on the agent type.
179 /// Agents should parse this into their own config struct.
180 pub config: serde_json::Value,
181}
182
183/// Request headers event
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct RequestHeadersEvent {
186 /// Event metadata
187 pub metadata: RequestMetadata,
188 /// HTTP method
189 pub method: String,
190 /// Request URI
191 pub uri: String,
192 /// HTTP headers
193 pub headers: HashMap<String, Vec<String>>,
194}
195
196/// Request body chunk event
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct RequestBodyChunkEvent {
199 /// Correlation ID
200 pub correlation_id: String,
201 /// Body chunk data (base64 encoded for JSON transport)
202 pub data: String,
203 /// Is this the last chunk?
204 pub is_last: bool,
205 /// Total body size if known
206 pub total_size: Option<usize>,
207 /// Chunk index for ordering (0-based)
208 ///
209 /// Used to match mutations to chunks and ensure ordering.
210 #[serde(default)]
211 pub chunk_index: u32,
212 /// Bytes received so far (cumulative)
213 #[serde(default)]
214 pub bytes_received: usize,
215}
216
217/// Response headers event
218#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct ResponseHeadersEvent {
220 /// Correlation ID
221 pub correlation_id: String,
222 /// HTTP status code
223 pub status: u16,
224 /// HTTP headers
225 pub headers: HashMap<String, Vec<String>>,
226}
227
228/// Response body chunk event
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct ResponseBodyChunkEvent {
231 /// Correlation ID
232 pub correlation_id: String,
233 /// Body chunk data (base64 encoded for JSON transport)
234 pub data: String,
235 /// Is this the last chunk?
236 pub is_last: bool,
237 /// Total body size if known
238 pub total_size: Option<usize>,
239 /// Chunk index for ordering (0-based)
240 #[serde(default)]
241 pub chunk_index: u32,
242 /// Bytes sent so far (cumulative)
243 #[serde(default)]
244 pub bytes_sent: usize,
245}
246
247/// Request complete event (for logging/audit)
248#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct RequestCompleteEvent {
250 /// Correlation ID
251 pub correlation_id: String,
252 /// Final HTTP status code
253 pub status: u16,
254 /// Request duration in milliseconds
255 pub duration_ms: u64,
256 /// Request body size
257 pub request_body_size: usize,
258 /// Response body size
259 pub response_body_size: usize,
260 /// Upstream attempts
261 pub upstream_attempts: u32,
262 /// Error if any
263 pub error: Option<String>,
264}
265
266// ============================================================================
267// WebSocket Frame Events
268// ============================================================================
269
270/// WebSocket frame event
271///
272/// Sent to agents after a WebSocket upgrade when frame inspection is enabled.
273/// Each frame is sent individually for inspection.
274#[derive(Debug, Clone, Serialize, Deserialize)]
275pub struct WebSocketFrameEvent {
276 /// Correlation ID (same as the original HTTP upgrade request)
277 pub correlation_id: String,
278 /// Frame opcode: "text", "binary", "ping", "pong", "close", "continuation"
279 pub opcode: String,
280 /// Frame payload (base64 encoded for JSON transport)
281 pub data: String,
282 /// Direction: true = client->server, false = server->client
283 pub client_to_server: bool,
284 /// Frame index for this connection (0-based, per direction)
285 pub frame_index: u64,
286 /// FIN bit - true if final frame of message (for fragmented messages)
287 pub fin: bool,
288 /// Route ID
289 pub route_id: Option<String>,
290 /// Client IP
291 pub client_ip: String,
292}
293
294/// WebSocket opcode
295#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
296#[serde(rename_all = "snake_case")]
297pub enum WebSocketOpcode {
298 /// Continuation frame (0x0)
299 Continuation,
300 /// Text frame (0x1)
301 Text,
302 /// Binary frame (0x2)
303 Binary,
304 /// Connection close (0x8)
305 Close,
306 /// Ping (0x9)
307 Ping,
308 /// Pong (0xA)
309 Pong,
310}
311
312impl WebSocketOpcode {
313 /// Convert opcode to string representation
314 pub fn as_str(&self) -> &'static str {
315 match self {
316 Self::Continuation => "continuation",
317 Self::Text => "text",
318 Self::Binary => "binary",
319 Self::Close => "close",
320 Self::Ping => "ping",
321 Self::Pong => "pong",
322 }
323 }
324
325 /// Parse from byte value
326 pub fn from_u8(value: u8) -> Option<Self> {
327 match value {
328 0x0 => Some(Self::Continuation),
329 0x1 => Some(Self::Text),
330 0x2 => Some(Self::Binary),
331 0x8 => Some(Self::Close),
332 0x9 => Some(Self::Ping),
333 0xA => Some(Self::Pong),
334 _ => None,
335 }
336 }
337
338 /// Convert to byte value
339 pub fn as_u8(&self) -> u8 {
340 match self {
341 Self::Continuation => 0x0,
342 Self::Text => 0x1,
343 Self::Binary => 0x2,
344 Self::Close => 0x8,
345 Self::Ping => 0x9,
346 Self::Pong => 0xA,
347 }
348 }
349}
350
351/// WebSocket frame decision
352///
353/// Agents return this decision for WebSocket frame events.
354#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
355#[serde(rename_all = "snake_case")]
356pub enum WebSocketDecision {
357 /// Allow frame to pass through
358 #[default]
359 Allow,
360 /// Drop this frame silently (don't forward)
361 Drop,
362 /// Close the WebSocket connection
363 Close {
364 /// Close code (RFC 6455 section 7.4.1)
365 code: u16,
366 /// Close reason
367 reason: String,
368 },
369}
370
371/// Agent request message
372#[derive(Debug, Clone, Serialize, Deserialize)]
373pub struct AgentRequest {
374 /// Protocol version
375 pub version: u32,
376 /// Event type
377 pub event_type: EventType,
378 /// Event payload (JSON)
379 pub payload: serde_json::Value,
380}
381
382/// Agent response message
383#[derive(Debug, Clone, Serialize, Deserialize)]
384pub struct AgentResponse {
385 /// Protocol version
386 pub version: u32,
387 /// Decision
388 pub decision: Decision,
389 /// Header modifications for request
390 #[serde(default)]
391 pub request_headers: Vec<HeaderOp>,
392 /// Header modifications for response
393 #[serde(default)]
394 pub response_headers: Vec<HeaderOp>,
395 /// Routing metadata modifications
396 #[serde(default)]
397 pub routing_metadata: HashMap<String, String>,
398 /// Audit metadata
399 #[serde(default)]
400 pub audit: AuditMetadata,
401
402 // ========================================================================
403 // Streaming-specific fields
404 // ========================================================================
405 /// Agent needs more data to make a final decision
406 ///
407 /// When `true`, the current `decision` is provisional and may change
408 /// after processing more body chunks. The proxy should continue
409 /// streaming body data to this agent.
410 ///
411 /// When `false` (default), the decision is final.
412 #[serde(default)]
413 pub needs_more: bool,
414
415 /// Request body mutation (for streaming mode)
416 ///
417 /// If present, applies the mutation to the current request body chunk.
418 /// Only valid for `RequestBodyChunk` events.
419 #[serde(default)]
420 pub request_body_mutation: Option<BodyMutation>,
421
422 /// Response body mutation (for streaming mode)
423 ///
424 /// If present, applies the mutation to the current response body chunk.
425 /// Only valid for `ResponseBodyChunk` events.
426 #[serde(default)]
427 pub response_body_mutation: Option<BodyMutation>,
428
429 /// WebSocket frame decision
430 ///
431 /// Only valid for `WebSocketFrame` events. If not set, defaults to Allow.
432 #[serde(default)]
433 pub websocket_decision: Option<WebSocketDecision>,
434}
435
436impl AgentResponse {
437 /// Create a default allow response
438 pub fn default_allow() -> Self {
439 Self {
440 version: PROTOCOL_VERSION,
441 decision: Decision::Allow,
442 request_headers: vec![],
443 response_headers: vec![],
444 routing_metadata: HashMap::new(),
445 audit: AuditMetadata::default(),
446 needs_more: false,
447 request_body_mutation: None,
448 response_body_mutation: None,
449 websocket_decision: None,
450 }
451 }
452
453 /// Create a block response
454 pub fn block(status: u16, body: Option<String>) -> Self {
455 Self {
456 version: PROTOCOL_VERSION,
457 decision: Decision::Block {
458 status,
459 body,
460 headers: None,
461 },
462 request_headers: vec![],
463 response_headers: vec![],
464 routing_metadata: HashMap::new(),
465 audit: AuditMetadata::default(),
466 needs_more: false,
467 request_body_mutation: None,
468 response_body_mutation: None,
469 websocket_decision: None,
470 }
471 }
472
473 /// Create a redirect response
474 pub fn redirect(url: String, status: u16) -> Self {
475 Self {
476 version: PROTOCOL_VERSION,
477 decision: Decision::Redirect { url, status },
478 request_headers: vec![],
479 response_headers: vec![],
480 routing_metadata: HashMap::new(),
481 audit: AuditMetadata::default(),
482 needs_more: false,
483 request_body_mutation: None,
484 response_body_mutation: None,
485 websocket_decision: None,
486 }
487 }
488
489 /// Create a streaming response indicating more data is needed
490 pub fn needs_more_data() -> Self {
491 Self {
492 version: PROTOCOL_VERSION,
493 decision: Decision::Allow,
494 request_headers: vec![],
495 response_headers: vec![],
496 routing_metadata: HashMap::new(),
497 audit: AuditMetadata::default(),
498 needs_more: true,
499 request_body_mutation: None,
500 response_body_mutation: None,
501 websocket_decision: None,
502 }
503 }
504
505 /// Create a WebSocket allow response
506 pub fn websocket_allow() -> Self {
507 Self {
508 websocket_decision: Some(WebSocketDecision::Allow),
509 ..Self::default_allow()
510 }
511 }
512
513 /// Create a WebSocket drop response (drop the frame, don't forward)
514 pub fn websocket_drop() -> Self {
515 Self {
516 websocket_decision: Some(WebSocketDecision::Drop),
517 ..Self::default_allow()
518 }
519 }
520
521 /// Create a WebSocket close response (close the connection)
522 pub fn websocket_close(code: u16, reason: String) -> Self {
523 Self {
524 websocket_decision: Some(WebSocketDecision::Close { code, reason }),
525 ..Self::default_allow()
526 }
527 }
528
529 /// Set WebSocket decision
530 pub fn with_websocket_decision(mut self, decision: WebSocketDecision) -> Self {
531 self.websocket_decision = Some(decision);
532 self
533 }
534
535 /// Create a streaming response with body mutation
536 pub fn with_request_body_mutation(mut self, mutation: BodyMutation) -> Self {
537 self.request_body_mutation = Some(mutation);
538 self
539 }
540
541 /// Create a streaming response with response body mutation
542 pub fn with_response_body_mutation(mut self, mutation: BodyMutation) -> Self {
543 self.response_body_mutation = Some(mutation);
544 self
545 }
546
547 /// Set needs_more flag
548 pub fn set_needs_more(mut self, needs_more: bool) -> Self {
549 self.needs_more = needs_more;
550 self
551 }
552
553 /// Add a request header modification
554 pub fn add_request_header(mut self, op: HeaderOp) -> Self {
555 self.request_headers.push(op);
556 self
557 }
558
559 /// Add a response header modification
560 pub fn add_response_header(mut self, op: HeaderOp) -> Self {
561 self.response_headers.push(op);
562 self
563 }
564
565 /// Add audit metadata
566 pub fn with_audit(mut self, audit: AuditMetadata) -> Self {
567 self.audit = audit;
568 self
569 }
570}
571
572/// Audit metadata from agent
573#[derive(Debug, Clone, Default, Serialize, Deserialize)]
574pub struct AuditMetadata {
575 /// Tags for logging/metrics
576 #[serde(default)]
577 pub tags: Vec<String>,
578 /// Rule IDs that matched
579 #[serde(default)]
580 pub rule_ids: Vec<String>,
581 /// Confidence score (0.0 - 1.0)
582 pub confidence: Option<f32>,
583 /// Reason codes
584 #[serde(default)]
585 pub reason_codes: Vec<String>,
586 /// Custom metadata
587 #[serde(default)]
588 pub custom: HashMap<String, serde_json::Value>,
589}