sentinel_agent_protocol/protocol.rs
1//! Agent protocol types and constants.
2//!
3//! This module defines the wire protocol types for communication between
4//! the proxy dataplane and external processing agents.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// Agent protocol version
10pub const PROTOCOL_VERSION: u32 = 1;
11
12/// Maximum message size (10MB)
13pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
14
15/// Agent event type
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
17#[serde(rename_all = "snake_case")]
18pub enum EventType {
19 /// Agent configuration (sent once when agent connects)
20 Configure,
21 /// Request headers received
22 RequestHeaders,
23 /// Request body chunk received
24 RequestBodyChunk,
25 /// Response headers received
26 ResponseHeaders,
27 /// Response body chunk received
28 ResponseBodyChunk,
29 /// Request/response complete (for logging)
30 RequestComplete,
31 /// WebSocket frame received (after upgrade)
32 WebSocketFrame,
33}
34
35/// Agent decision
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
37#[serde(rename_all = "snake_case")]
38pub enum Decision {
39 /// Allow the request/response to continue
40 #[default]
41 Allow,
42 /// Block the request/response
43 Block {
44 /// HTTP status code to return
45 status: u16,
46 /// Optional response body
47 body: Option<String>,
48 /// Optional response headers
49 headers: Option<HashMap<String, String>>,
50 },
51 /// Redirect the request
52 Redirect {
53 /// Redirect URL
54 url: String,
55 /// HTTP status code (301, 302, 303, 307, 308)
56 status: u16,
57 },
58 /// Challenge the client (e.g., CAPTCHA)
59 Challenge {
60 /// Challenge type
61 challenge_type: String,
62 /// Challenge parameters
63 params: HashMap<String, String>,
64 },
65}
66
67/// Header modification operation
68#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(rename_all = "snake_case")]
70pub enum HeaderOp {
71 /// Set a header (replace if exists)
72 Set { name: String, value: String },
73 /// Add a header (append if exists)
74 Add { name: String, value: String },
75 /// Remove a header
76 Remove { name: String },
77}
78
79// ============================================================================
80// Body Mutation
81// ============================================================================
82
83/// Body mutation from agent
84///
85/// Allows agents to modify body content during streaming:
86/// - `None` data: pass through original chunk unchanged
87/// - `Some(empty)`: drop the chunk entirely
88/// - `Some(data)`: replace chunk with modified content
89#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
90pub struct BodyMutation {
91 /// Modified body data (base64 encoded for JSON transport)
92 ///
93 /// - `None`: use original chunk unchanged
94 /// - `Some("")`: drop this chunk
95 /// - `Some(data)`: replace chunk with this data
96 pub data: Option<String>,
97
98 /// Chunk index this mutation applies to
99 ///
100 /// Must match the `chunk_index` from the body chunk event.
101 #[serde(default)]
102 pub chunk_index: u32,
103}
104
105impl BodyMutation {
106 /// Create a pass-through mutation (no change)
107 pub fn pass_through(chunk_index: u32) -> Self {
108 Self {
109 data: None,
110 chunk_index,
111 }
112 }
113
114 /// Create a mutation that drops the chunk
115 pub fn drop_chunk(chunk_index: u32) -> Self {
116 Self {
117 data: Some(String::new()),
118 chunk_index,
119 }
120 }
121
122 /// Create a mutation that replaces the chunk
123 pub fn replace(chunk_index: u32, data: String) -> Self {
124 Self {
125 data: Some(data),
126 chunk_index,
127 }
128 }
129
130 /// Check if this mutation passes through unchanged
131 pub fn is_pass_through(&self) -> bool {
132 self.data.is_none()
133 }
134
135 /// Check if this mutation drops the chunk
136 pub fn is_drop(&self) -> bool {
137 matches!(&self.data, Some(d) if d.is_empty())
138 }
139}
140
141/// Request metadata sent to agents
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct RequestMetadata {
144 /// Correlation ID for request tracing
145 pub correlation_id: String,
146 /// Request ID (internal)
147 pub request_id: String,
148 /// Client IP address
149 pub client_ip: String,
150 /// Client port
151 pub client_port: u16,
152 /// Server name (SNI or Host header)
153 pub server_name: Option<String>,
154 /// Protocol (HTTP/1.1, HTTP/2, etc.)
155 pub protocol: String,
156 /// TLS version if applicable
157 pub tls_version: Option<String>,
158 /// TLS cipher suite if applicable
159 pub tls_cipher: Option<String>,
160 /// Route ID that matched
161 pub route_id: Option<String>,
162 /// Upstream ID
163 pub upstream_id: Option<String>,
164 /// Request start timestamp (RFC3339)
165 pub timestamp: String,
166 /// W3C Trace Context traceparent header (for distributed tracing)
167 ///
168 /// Format: `{version}-{trace-id}-{parent-id}-{trace-flags}`
169 /// Example: `00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01`
170 ///
171 /// Agents can use this to create child spans that link to the proxy's span.
172 #[serde(skip_serializing_if = "Option::is_none")]
173 pub traceparent: Option<String>,
174}
175
176/// Configure event
177///
178/// Sent once when an agent connects, before any request events.
179/// Contains agent-specific configuration from the proxy's config file.
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct ConfigureEvent {
182 /// Agent ID (from proxy config)
183 pub agent_id: String,
184 /// Agent-specific configuration (JSON object)
185 ///
186 /// The structure of this config depends on the agent type.
187 /// Agents should parse this into their own config struct.
188 pub config: serde_json::Value,
189}
190
191/// Request headers event
192#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct RequestHeadersEvent {
194 /// Event metadata
195 pub metadata: RequestMetadata,
196 /// HTTP method
197 pub method: String,
198 /// Request URI
199 pub uri: String,
200 /// HTTP headers
201 pub headers: HashMap<String, Vec<String>>,
202}
203
204/// Request body chunk event
205#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct RequestBodyChunkEvent {
207 /// Correlation ID
208 pub correlation_id: String,
209 /// Body chunk data (base64 encoded for JSON transport)
210 pub data: String,
211 /// Is this the last chunk?
212 pub is_last: bool,
213 /// Total body size if known
214 pub total_size: Option<usize>,
215 /// Chunk index for ordering (0-based)
216 ///
217 /// Used to match mutations to chunks and ensure ordering.
218 #[serde(default)]
219 pub chunk_index: u32,
220 /// Bytes received so far (cumulative)
221 #[serde(default)]
222 pub bytes_received: usize,
223}
224
225/// Response headers event
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct ResponseHeadersEvent {
228 /// Correlation ID
229 pub correlation_id: String,
230 /// HTTP status code
231 pub status: u16,
232 /// HTTP headers
233 pub headers: HashMap<String, Vec<String>>,
234}
235
236/// Response body chunk event
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct ResponseBodyChunkEvent {
239 /// Correlation ID
240 pub correlation_id: String,
241 /// Body chunk data (base64 encoded for JSON transport)
242 pub data: String,
243 /// Is this the last chunk?
244 pub is_last: bool,
245 /// Total body size if known
246 pub total_size: Option<usize>,
247 /// Chunk index for ordering (0-based)
248 #[serde(default)]
249 pub chunk_index: u32,
250 /// Bytes sent so far (cumulative)
251 #[serde(default)]
252 pub bytes_sent: usize,
253}
254
255/// Request complete event (for logging/audit)
256#[derive(Debug, Clone, Serialize, Deserialize)]
257pub struct RequestCompleteEvent {
258 /// Correlation ID
259 pub correlation_id: String,
260 /// Final HTTP status code
261 pub status: u16,
262 /// Request duration in milliseconds
263 pub duration_ms: u64,
264 /// Request body size
265 pub request_body_size: usize,
266 /// Response body size
267 pub response_body_size: usize,
268 /// Upstream attempts
269 pub upstream_attempts: u32,
270 /// Error if any
271 pub error: Option<String>,
272}
273
274// ============================================================================
275// WebSocket Frame Events
276// ============================================================================
277
278/// WebSocket frame event
279///
280/// Sent to agents after a WebSocket upgrade when frame inspection is enabled.
281/// Each frame is sent individually for inspection.
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct WebSocketFrameEvent {
284 /// Correlation ID (same as the original HTTP upgrade request)
285 pub correlation_id: String,
286 /// Frame opcode: "text", "binary", "ping", "pong", "close", "continuation"
287 pub opcode: String,
288 /// Frame payload (base64 encoded for JSON transport)
289 pub data: String,
290 /// Direction: true = client->server, false = server->client
291 pub client_to_server: bool,
292 /// Frame index for this connection (0-based, per direction)
293 pub frame_index: u64,
294 /// FIN bit - true if final frame of message (for fragmented messages)
295 pub fin: bool,
296 /// Route ID
297 pub route_id: Option<String>,
298 /// Client IP
299 pub client_ip: String,
300}
301
302/// WebSocket opcode
303#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
304#[serde(rename_all = "snake_case")]
305pub enum WebSocketOpcode {
306 /// Continuation frame (0x0)
307 Continuation,
308 /// Text frame (0x1)
309 Text,
310 /// Binary frame (0x2)
311 Binary,
312 /// Connection close (0x8)
313 Close,
314 /// Ping (0x9)
315 Ping,
316 /// Pong (0xA)
317 Pong,
318}
319
320impl WebSocketOpcode {
321 /// Convert opcode to string representation
322 pub fn as_str(&self) -> &'static str {
323 match self {
324 Self::Continuation => "continuation",
325 Self::Text => "text",
326 Self::Binary => "binary",
327 Self::Close => "close",
328 Self::Ping => "ping",
329 Self::Pong => "pong",
330 }
331 }
332
333 /// Parse from byte value
334 pub fn from_u8(value: u8) -> Option<Self> {
335 match value {
336 0x0 => Some(Self::Continuation),
337 0x1 => Some(Self::Text),
338 0x2 => Some(Self::Binary),
339 0x8 => Some(Self::Close),
340 0x9 => Some(Self::Ping),
341 0xA => Some(Self::Pong),
342 _ => None,
343 }
344 }
345
346 /// Convert to byte value
347 pub fn as_u8(&self) -> u8 {
348 match self {
349 Self::Continuation => 0x0,
350 Self::Text => 0x1,
351 Self::Binary => 0x2,
352 Self::Close => 0x8,
353 Self::Ping => 0x9,
354 Self::Pong => 0xA,
355 }
356 }
357}
358
359/// WebSocket frame decision
360///
361/// Agents return this decision for WebSocket frame events.
362#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
363#[serde(rename_all = "snake_case")]
364pub enum WebSocketDecision {
365 /// Allow frame to pass through
366 #[default]
367 Allow,
368 /// Drop this frame silently (don't forward)
369 Drop,
370 /// Close the WebSocket connection
371 Close {
372 /// Close code (RFC 6455 section 7.4.1)
373 code: u16,
374 /// Close reason
375 reason: String,
376 },
377}
378
379/// Agent request message
380#[derive(Debug, Clone, Serialize, Deserialize)]
381pub struct AgentRequest {
382 /// Protocol version
383 pub version: u32,
384 /// Event type
385 pub event_type: EventType,
386 /// Event payload (JSON)
387 pub payload: serde_json::Value,
388}
389
390/// Agent response message
391#[derive(Debug, Clone, Serialize, Deserialize)]
392pub struct AgentResponse {
393 /// Protocol version
394 pub version: u32,
395 /// Decision
396 pub decision: Decision,
397 /// Header modifications for request
398 #[serde(default)]
399 pub request_headers: Vec<HeaderOp>,
400 /// Header modifications for response
401 #[serde(default)]
402 pub response_headers: Vec<HeaderOp>,
403 /// Routing metadata modifications
404 #[serde(default)]
405 pub routing_metadata: HashMap<String, String>,
406 /// Audit metadata
407 #[serde(default)]
408 pub audit: AuditMetadata,
409
410 // ========================================================================
411 // Streaming-specific fields
412 // ========================================================================
413 /// Agent needs more data to make a final decision
414 ///
415 /// When `true`, the current `decision` is provisional and may change
416 /// after processing more body chunks. The proxy should continue
417 /// streaming body data to this agent.
418 ///
419 /// When `false` (default), the decision is final.
420 #[serde(default)]
421 pub needs_more: bool,
422
423 /// Request body mutation (for streaming mode)
424 ///
425 /// If present, applies the mutation to the current request body chunk.
426 /// Only valid for `RequestBodyChunk` events.
427 #[serde(default)]
428 pub request_body_mutation: Option<BodyMutation>,
429
430 /// Response body mutation (for streaming mode)
431 ///
432 /// If present, applies the mutation to the current response body chunk.
433 /// Only valid for `ResponseBodyChunk` events.
434 #[serde(default)]
435 pub response_body_mutation: Option<BodyMutation>,
436
437 /// WebSocket frame decision
438 ///
439 /// Only valid for `WebSocketFrame` events. If not set, defaults to Allow.
440 #[serde(default)]
441 pub websocket_decision: Option<WebSocketDecision>,
442}
443
444impl AgentResponse {
445 /// Create a default allow response
446 pub fn default_allow() -> Self {
447 Self {
448 version: PROTOCOL_VERSION,
449 decision: Decision::Allow,
450 request_headers: vec![],
451 response_headers: vec![],
452 routing_metadata: HashMap::new(),
453 audit: AuditMetadata::default(),
454 needs_more: false,
455 request_body_mutation: None,
456 response_body_mutation: None,
457 websocket_decision: None,
458 }
459 }
460
461 /// Create a block response
462 pub fn block(status: u16, body: Option<String>) -> Self {
463 Self {
464 version: PROTOCOL_VERSION,
465 decision: Decision::Block {
466 status,
467 body,
468 headers: None,
469 },
470 request_headers: vec![],
471 response_headers: vec![],
472 routing_metadata: HashMap::new(),
473 audit: AuditMetadata::default(),
474 needs_more: false,
475 request_body_mutation: None,
476 response_body_mutation: None,
477 websocket_decision: None,
478 }
479 }
480
481 /// Create a redirect response
482 pub fn redirect(url: String, status: u16) -> Self {
483 Self {
484 version: PROTOCOL_VERSION,
485 decision: Decision::Redirect { url, status },
486 request_headers: vec![],
487 response_headers: vec![],
488 routing_metadata: HashMap::new(),
489 audit: AuditMetadata::default(),
490 needs_more: false,
491 request_body_mutation: None,
492 response_body_mutation: None,
493 websocket_decision: None,
494 }
495 }
496
497 /// Create a streaming response indicating more data is needed
498 pub fn needs_more_data() -> Self {
499 Self {
500 version: PROTOCOL_VERSION,
501 decision: Decision::Allow,
502 request_headers: vec![],
503 response_headers: vec![],
504 routing_metadata: HashMap::new(),
505 audit: AuditMetadata::default(),
506 needs_more: true,
507 request_body_mutation: None,
508 response_body_mutation: None,
509 websocket_decision: None,
510 }
511 }
512
513 /// Create a WebSocket allow response
514 pub fn websocket_allow() -> Self {
515 Self {
516 websocket_decision: Some(WebSocketDecision::Allow),
517 ..Self::default_allow()
518 }
519 }
520
521 /// Create a WebSocket drop response (drop the frame, don't forward)
522 pub fn websocket_drop() -> Self {
523 Self {
524 websocket_decision: Some(WebSocketDecision::Drop),
525 ..Self::default_allow()
526 }
527 }
528
529 /// Create a WebSocket close response (close the connection)
530 pub fn websocket_close(code: u16, reason: String) -> Self {
531 Self {
532 websocket_decision: Some(WebSocketDecision::Close { code, reason }),
533 ..Self::default_allow()
534 }
535 }
536
537 /// Set WebSocket decision
538 pub fn with_websocket_decision(mut self, decision: WebSocketDecision) -> Self {
539 self.websocket_decision = Some(decision);
540 self
541 }
542
543 /// Create a streaming response with body mutation
544 pub fn with_request_body_mutation(mut self, mutation: BodyMutation) -> Self {
545 self.request_body_mutation = Some(mutation);
546 self
547 }
548
549 /// Create a streaming response with response body mutation
550 pub fn with_response_body_mutation(mut self, mutation: BodyMutation) -> Self {
551 self.response_body_mutation = Some(mutation);
552 self
553 }
554
555 /// Set needs_more flag
556 pub fn set_needs_more(mut self, needs_more: bool) -> Self {
557 self.needs_more = needs_more;
558 self
559 }
560
561 /// Add a request header modification
562 pub fn add_request_header(mut self, op: HeaderOp) -> Self {
563 self.request_headers.push(op);
564 self
565 }
566
567 /// Add a response header modification
568 pub fn add_response_header(mut self, op: HeaderOp) -> Self {
569 self.response_headers.push(op);
570 self
571 }
572
573 /// Add audit metadata
574 pub fn with_audit(mut self, audit: AuditMetadata) -> Self {
575 self.audit = audit;
576 self
577 }
578}
579
580/// Audit metadata from agent
581#[derive(Debug, Clone, Default, Serialize, Deserialize)]
582pub struct AuditMetadata {
583 /// Tags for logging/metrics
584 #[serde(default)]
585 pub tags: Vec<String>,
586 /// Rule IDs that matched
587 #[serde(default)]
588 pub rule_ids: Vec<String>,
589 /// Confidence score (0.0 - 1.0)
590 pub confidence: Option<f32>,
591 /// Reason codes
592 #[serde(default)]
593 pub reason_codes: Vec<String>,
594 /// Custom metadata
595 #[serde(default)]
596 pub custom: HashMap<String, serde_json::Value>,
597}