sentinel_agent_protocol/
server.rs

1//! Agent server for implementing external agents.
2//!
3//! Supports two transport mechanisms:
4//! - Unix domain sockets (length-prefixed JSON)
5//! - gRPC (Protocol Buffers over HTTP/2)
6
7use async_trait::async_trait;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use tokio::io::{AsyncReadExt, AsyncWriteExt};
11use tokio::net::{UnixListener, UnixStream};
12use tokio_stream::StreamExt;
13use tonic::{Request, Response, Status, Streaming};
14use tracing::{debug, error, info, trace, warn};
15
16use crate::errors::AgentProtocolError;
17use crate::grpc::{
18    self, agent_processor_server::AgentProcessor, agent_processor_server::AgentProcessorServer,
19};
20use crate::protocol::{
21    AgentRequest, AgentResponse, AuditMetadata, ConfigureEvent, Decision, EventType,
22    GuardrailInspectEvent, HeaderOp, RequestBodyChunkEvent, RequestCompleteEvent,
23    RequestHeadersEvent, RequestMetadata, ResponseBodyChunkEvent, ResponseHeadersEvent,
24    WebSocketDecision, WebSocketFrameEvent, MAX_MESSAGE_SIZE, PROTOCOL_VERSION,
25};
26
27/// Agent server for testing and reference implementations
28pub struct AgentServer {
29    /// Agent ID
30    id: String,
31    /// Unix socket path
32    socket_path: std::path::PathBuf,
33    /// Request handler
34    handler: Arc<dyn AgentHandler>,
35}
36
37/// Trait for implementing agent logic
38#[async_trait]
39pub trait AgentHandler: Send + Sync {
40    /// Handle a configure event
41    ///
42    /// Called once when the agent connects, before any request events.
43    /// Use this to receive agent-specific configuration from the proxy.
44    ///
45    /// The default implementation accepts any configuration silently.
46    /// Override this to parse and validate your agent's configuration.
47    async fn on_configure(&self, _event: ConfigureEvent) -> AgentResponse {
48        AgentResponse::default_allow()
49    }
50
51    /// Handle a request headers event
52    async fn on_request_headers(&self, _event: RequestHeadersEvent) -> AgentResponse {
53        AgentResponse::default_allow()
54    }
55
56    /// Handle a request body chunk event
57    async fn on_request_body_chunk(&self, _event: RequestBodyChunkEvent) -> AgentResponse {
58        AgentResponse::default_allow()
59    }
60
61    /// Handle a response headers event
62    async fn on_response_headers(&self, _event: ResponseHeadersEvent) -> AgentResponse {
63        AgentResponse::default_allow()
64    }
65
66    /// Handle a response body chunk event
67    async fn on_response_body_chunk(&self, _event: ResponseBodyChunkEvent) -> AgentResponse {
68        AgentResponse::default_allow()
69    }
70
71    /// Handle a request complete event
72    async fn on_request_complete(&self, _event: RequestCompleteEvent) -> AgentResponse {
73        AgentResponse::default_allow()
74    }
75
76    /// Handle a WebSocket frame event
77    ///
78    /// Called for each WebSocket frame when inspection is enabled.
79    /// Return `AgentResponse::websocket_allow()` to forward the frame,
80    /// `AgentResponse::websocket_drop()` to silently drop it, or
81    /// `AgentResponse::websocket_close(code, reason)` to close the connection.
82    async fn on_websocket_frame(&self, _event: WebSocketFrameEvent) -> AgentResponse {
83        AgentResponse::websocket_allow()
84    }
85
86    /// Handle a guardrail inspection event
87    ///
88    /// Called for semantic content analysis (prompt injection detection, PII detection).
89    /// Return `AgentResponse::default_allow()` if no issues detected.
90    /// The guardrail-specific response data should be returned via a separate mechanism
91    /// (the caller will interpret the response based on context).
92    async fn on_guardrail_inspect(&self, _event: GuardrailInspectEvent) -> AgentResponse {
93        AgentResponse::default_allow()
94    }
95}
96
97impl AgentServer {
98    /// Create a new agent server
99    pub fn new(
100        id: impl Into<String>,
101        socket_path: impl Into<std::path::PathBuf>,
102        handler: Box<dyn AgentHandler>,
103    ) -> Self {
104        let id = id.into();
105        let socket_path = socket_path.into();
106
107        debug!(
108            agent_id = %id,
109            socket_path = %socket_path.display(),
110            "Creating agent server"
111        );
112
113        Self {
114            id,
115            socket_path,
116            handler: Arc::from(handler),
117        }
118    }
119
120    /// Start the agent server
121    pub async fn run(&self) -> Result<(), AgentProtocolError> {
122        // Remove existing socket file if it exists
123        if self.socket_path.exists() {
124            trace!(
125                agent_id = %self.id,
126                socket_path = %self.socket_path.display(),
127                "Removing existing socket file"
128            );
129            std::fs::remove_file(&self.socket_path)?;
130        }
131
132        // Create Unix socket listener
133        let listener = UnixListener::bind(&self.socket_path)?;
134
135        info!(
136            agent_id = %self.id,
137            socket_path = %self.socket_path.display(),
138            "Agent server listening"
139        );
140
141        loop {
142            match listener.accept().await {
143                Ok((stream, _addr)) => {
144                    trace!(
145                        agent_id = %self.id,
146                        "Accepted new connection"
147                    );
148                    let handler = Arc::clone(&self.handler);
149                    let agent_id = self.id.clone();
150                    tokio::spawn(async move {
151                        if let Err(e) = Self::handle_connection(stream, handler.as_ref()).await {
152                            error!(
153                                agent_id = %agent_id,
154                                error = %e,
155                                "Error handling agent connection"
156                            );
157                        }
158                    });
159                }
160                Err(e) => {
161                    error!(
162                        agent_id = %self.id,
163                        error = %e,
164                        "Failed to accept connection"
165                    );
166                }
167            }
168        }
169    }
170
171    /// Handle a single connection
172    async fn handle_connection(
173        mut stream: UnixStream,
174        handler: &dyn AgentHandler,
175    ) -> Result<(), AgentProtocolError> {
176        trace!("Starting connection handler");
177
178        loop {
179            // Read message length
180            let mut len_bytes = [0u8; 4];
181            match stream.read_exact(&mut len_bytes).await {
182                Ok(_) => {}
183                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
184                    // Client disconnected
185                    trace!("Client disconnected (EOF)");
186                    return Ok(());
187                }
188                Err(e) => {
189                    error!(error = %e, "Error reading message length");
190                    return Err(e.into());
191                }
192            }
193
194            let message_len = u32::from_be_bytes(len_bytes) as usize;
195
196            // Check message size
197            if message_len > MAX_MESSAGE_SIZE {
198                warn!(
199                    message_len = message_len,
200                    max_size = MAX_MESSAGE_SIZE,
201                    "Message too large"
202                );
203                return Err(AgentProtocolError::MessageTooLarge {
204                    size: message_len,
205                    max: MAX_MESSAGE_SIZE,
206                });
207            }
208
209            trace!(message_len = message_len, "Reading message data");
210
211            // Read message data
212            let mut buffer = vec![0u8; message_len];
213            stream.read_exact(&mut buffer).await?;
214
215            // Parse request
216            let request: AgentRequest = serde_json::from_slice(&buffer)
217                .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
218
219            trace!(
220                event_type = ?request.event_type,
221                version = request.version,
222                "Received agent request"
223            );
224
225            // Handle request based on event type
226            let response = match request.event_type {
227                EventType::Configure => {
228                    let event: ConfigureEvent = serde_json::from_value(request.payload)
229                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
230                    trace!(
231                        agent_id = %event.agent_id,
232                        "Processing configure event"
233                    );
234                    handler.on_configure(event).await
235                }
236                EventType::RequestHeaders => {
237                    let event: RequestHeadersEvent = serde_json::from_value(request.payload)
238                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
239                    trace!(
240                        correlation_id = %event.metadata.correlation_id,
241                        method = %event.method,
242                        uri = %event.uri,
243                        "Processing request_headers event"
244                    );
245                    handler.on_request_headers(event).await
246                }
247                EventType::RequestBodyChunk => {
248                    let event: RequestBodyChunkEvent = serde_json::from_value(request.payload)
249                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
250                    trace!(
251                        correlation_id = %event.correlation_id,
252                        is_last = event.is_last,
253                        data_len = event.data.len(),
254                        "Processing request_body_chunk event"
255                    );
256                    handler.on_request_body_chunk(event).await
257                }
258                EventType::ResponseHeaders => {
259                    let event: ResponseHeadersEvent = serde_json::from_value(request.payload)
260                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
261                    trace!(
262                        correlation_id = %event.correlation_id,
263                        status = event.status,
264                        "Processing response_headers event"
265                    );
266                    handler.on_response_headers(event).await
267                }
268                EventType::ResponseBodyChunk => {
269                    let event: ResponseBodyChunkEvent = serde_json::from_value(request.payload)
270                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
271                    trace!(
272                        correlation_id = %event.correlation_id,
273                        is_last = event.is_last,
274                        data_len = event.data.len(),
275                        "Processing response_body_chunk event"
276                    );
277                    handler.on_response_body_chunk(event).await
278                }
279                EventType::RequestComplete => {
280                    let event: RequestCompleteEvent = serde_json::from_value(request.payload)
281                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
282                    trace!(
283                        correlation_id = %event.correlation_id,
284                        status = event.status,
285                        duration_ms = event.duration_ms,
286                        "Processing request_complete event"
287                    );
288                    handler.on_request_complete(event).await
289                }
290                EventType::WebSocketFrame => {
291                    let event: WebSocketFrameEvent = serde_json::from_value(request.payload)
292                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
293                    trace!(
294                        correlation_id = %event.correlation_id,
295                        opcode = %event.opcode,
296                        frame_index = event.frame_index,
297                        client_to_server = event.client_to_server,
298                        "Processing websocket_frame event"
299                    );
300                    handler.on_websocket_frame(event).await
301                }
302                EventType::GuardrailInspect => {
303                    let event: GuardrailInspectEvent = serde_json::from_value(request.payload)
304                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
305                    trace!(
306                        correlation_id = %event.correlation_id,
307                        inspection_type = ?event.inspection_type,
308                        content_len = event.content.len(),
309                        "Processing guardrail_inspect event"
310                    );
311                    handler.on_guardrail_inspect(event).await
312                }
313            };
314
315            trace!(
316                decision = ?response.decision,
317                "Sending agent response"
318            );
319
320            // Send response
321            let response_bytes = serde_json::to_vec(&response)
322                .map_err(|e| AgentProtocolError::Serialization(e.to_string()))?;
323
324            // Write message length
325            let len_bytes = (response_bytes.len() as u32).to_be_bytes();
326            stream.write_all(&len_bytes).await?;
327            // Write message data
328            stream.write_all(&response_bytes).await?;
329            stream.flush().await?;
330
331            trace!(response_len = response_bytes.len(), "Response sent");
332        }
333    }
334}
335
336/// Reference implementation: Echo agent (for testing)
337pub struct EchoAgent;
338
339#[async_trait]
340impl AgentHandler for EchoAgent {
341    async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
342        debug!(
343            "Echo agent: request headers for {}",
344            event.metadata.correlation_id
345        );
346
347        // Echo back correlation ID as a header
348        AgentResponse::default_allow()
349            .add_request_header(HeaderOp::Set {
350                name: "X-Echo-Agent".to_string(),
351                value: event.metadata.correlation_id.clone(),
352            })
353            .with_audit(AuditMetadata {
354                tags: vec!["echo".to_string()],
355                ..Default::default()
356            })
357    }
358}
359
360/// Reference implementation: Denylist agent
361pub struct DenylistAgent {
362    blocked_paths: Vec<String>,
363    blocked_ips: Vec<String>,
364}
365
366impl DenylistAgent {
367    pub fn new(blocked_paths: Vec<String>, blocked_ips: Vec<String>) -> Self {
368        Self {
369            blocked_paths,
370            blocked_ips,
371        }
372    }
373}
374
375#[async_trait]
376impl AgentHandler for DenylistAgent {
377    async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
378        trace!(
379            correlation_id = %event.metadata.correlation_id,
380            uri = %event.uri,
381            client_ip = %event.metadata.client_ip,
382            "Denylist agent checking request"
383        );
384
385        // Check if path is blocked
386        for blocked_path in &self.blocked_paths {
387            if event.uri.starts_with(blocked_path) {
388                debug!(
389                    correlation_id = %event.metadata.correlation_id,
390                    blocked_path = %blocked_path,
391                    uri = %event.uri,
392                    "Blocking request: path matched denylist"
393                );
394                return AgentResponse::block(403, Some("Forbidden path".to_string())).with_audit(
395                    AuditMetadata {
396                        tags: vec!["denylist".to_string(), "blocked_path".to_string()],
397                        reason_codes: vec!["PATH_BLOCKED".to_string()],
398                        ..Default::default()
399                    },
400                );
401            }
402        }
403
404        // Check if IP is blocked
405        if self.blocked_ips.contains(&event.metadata.client_ip) {
406            debug!(
407                correlation_id = %event.metadata.correlation_id,
408                client_ip = %event.metadata.client_ip,
409                "Blocking request: IP matched denylist"
410            );
411            return AgentResponse::block(403, Some("Forbidden IP".to_string())).with_audit(
412                AuditMetadata {
413                    tags: vec!["denylist".to_string(), "blocked_ip".to_string()],
414                    reason_codes: vec!["IP_BLOCKED".to_string()],
415                    ..Default::default()
416                },
417            );
418        }
419
420        trace!(
421            correlation_id = %event.metadata.correlation_id,
422            "Request allowed by denylist agent"
423        );
424        AgentResponse::default_allow()
425    }
426}
427
428// ============================================================================
429// gRPC Server Implementation
430// ============================================================================
431
432/// gRPC agent server for implementing external agents
433pub struct GrpcAgentServer {
434    /// Agent ID
435    id: String,
436    /// Request handler
437    handler: Arc<dyn AgentHandler>,
438}
439
440impl GrpcAgentServer {
441    /// Create a new gRPC agent server
442    pub fn new(id: impl Into<String>, handler: Box<dyn AgentHandler>) -> Self {
443        let id = id.into();
444        debug!(agent_id = %id, "Creating gRPC agent server");
445        Self {
446            id,
447            handler: Arc::from(handler),
448        }
449    }
450
451    /// Get the tonic service for this agent
452    pub fn into_service(self) -> AgentProcessorServer<GrpcAgentHandler> {
453        trace!(agent_id = %self.id, "Converting to tonic service");
454        AgentProcessorServer::new(GrpcAgentHandler {
455            id: self.id,
456            handler: self.handler,
457        })
458    }
459
460    /// Start the gRPC server on the given address
461    pub async fn run(self, addr: SocketAddr) -> Result<(), AgentProtocolError> {
462        info!(
463            agent_id = %self.id,
464            address = %addr,
465            "gRPC agent server listening"
466        );
467
468        tonic::transport::Server::builder()
469            .add_service(self.into_service())
470            .serve(addr)
471            .await
472            .map_err(|e| {
473                error!(error = %e, "gRPC server error");
474                AgentProtocolError::ConnectionFailed(format!("gRPC server error: {}", e))
475            })
476    }
477}
478
479/// Internal handler that implements the gRPC AgentProcessor trait
480pub struct GrpcAgentHandler {
481    id: String,
482    handler: Arc<dyn AgentHandler>,
483}
484
485#[tonic::async_trait]
486impl AgentProcessor for GrpcAgentHandler {
487    async fn process_event(
488        &self,
489        request: Request<grpc::AgentRequest>,
490    ) -> Result<Response<grpc::AgentResponse>, Status> {
491        let grpc_request = request.into_inner();
492
493        trace!(
494            agent_id = %self.id,
495            event_type = grpc_request.event_type,
496            version = grpc_request.version,
497            "Processing gRPC event"
498        );
499
500        // Convert gRPC event to internal event and dispatch
501        let response = match grpc_request.event {
502            Some(grpc::agent_request::Event::RequestHeaders(e)) => {
503                let event = Self::convert_request_headers_from_grpc(e);
504                trace!(
505                    agent_id = %self.id,
506                    correlation_id = %event.metadata.correlation_id,
507                    "Processing request_headers via gRPC"
508                );
509                self.handler.on_request_headers(event).await
510            }
511            Some(grpc::agent_request::Event::RequestBodyChunk(e)) => {
512                let event = Self::convert_request_body_chunk_from_grpc(e);
513                trace!(
514                    agent_id = %self.id,
515                    correlation_id = %event.correlation_id,
516                    "Processing request_body_chunk via gRPC"
517                );
518                self.handler.on_request_body_chunk(event).await
519            }
520            Some(grpc::agent_request::Event::ResponseHeaders(e)) => {
521                let event = Self::convert_response_headers_from_grpc(e);
522                trace!(
523                    agent_id = %self.id,
524                    correlation_id = %event.correlation_id,
525                    "Processing response_headers via gRPC"
526                );
527                self.handler.on_response_headers(event).await
528            }
529            Some(grpc::agent_request::Event::ResponseBodyChunk(e)) => {
530                let event = Self::convert_response_body_chunk_from_grpc(e);
531                trace!(
532                    agent_id = %self.id,
533                    correlation_id = %event.correlation_id,
534                    "Processing response_body_chunk via gRPC"
535                );
536                self.handler.on_response_body_chunk(event).await
537            }
538            Some(grpc::agent_request::Event::RequestComplete(e)) => {
539                let event = Self::convert_request_complete_from_grpc(e);
540                trace!(
541                    agent_id = %self.id,
542                    correlation_id = %event.correlation_id,
543                    "Processing request_complete via gRPC"
544                );
545                self.handler.on_request_complete(event).await
546            }
547            Some(grpc::agent_request::Event::WebsocketFrame(e)) => {
548                let event = Self::convert_websocket_frame_from_grpc(e);
549                trace!(
550                    agent_id = %self.id,
551                    correlation_id = %event.correlation_id,
552                    opcode = %event.opcode,
553                    "Processing websocket_frame via gRPC"
554                );
555                self.handler.on_websocket_frame(event).await
556            }
557            None => {
558                warn!(agent_id = %self.id, "Missing event in gRPC request");
559                return Err(Status::invalid_argument("Missing event in request"));
560            }
561        };
562
563        trace!(
564            agent_id = %self.id,
565            decision = ?response.decision,
566            "Returning gRPC response"
567        );
568
569        // Convert internal response to gRPC response
570        let grpc_response = Self::convert_response_to_grpc(response);
571        Ok(Response::new(grpc_response))
572    }
573
574    async fn process_event_stream(
575        &self,
576        request: Request<Streaming<grpc::AgentRequest>>,
577    ) -> Result<Response<grpc::AgentResponse>, Status> {
578        let mut stream = request.into_inner();
579
580        trace!(agent_id = %self.id, "Processing gRPC event stream");
581
582        // Process all events in the stream, returning the final response
583        let mut final_response = AgentResponse::default_allow();
584        let mut event_count = 0u32;
585
586        while let Some(result) = stream.next().await {
587            let grpc_request = result.map_err(|e| {
588                error!(agent_id = %self.id, error = %e, "Stream error");
589                Status::internal(format!("Stream error: {}", e))
590            })?;
591
592            event_count += 1;
593            trace!(
594                agent_id = %self.id,
595                event_count = event_count,
596                "Processing stream event"
597            );
598
599            let response = match grpc_request.event {
600                Some(grpc::agent_request::Event::RequestHeaders(e)) => {
601                    let event = Self::convert_request_headers_from_grpc(e);
602                    self.handler.on_request_headers(event).await
603                }
604                Some(grpc::agent_request::Event::RequestBodyChunk(e)) => {
605                    let event = Self::convert_request_body_chunk_from_grpc(e);
606                    self.handler.on_request_body_chunk(event).await
607                }
608                Some(grpc::agent_request::Event::ResponseHeaders(e)) => {
609                    let event = Self::convert_response_headers_from_grpc(e);
610                    self.handler.on_response_headers(event).await
611                }
612                Some(grpc::agent_request::Event::ResponseBodyChunk(e)) => {
613                    let event = Self::convert_response_body_chunk_from_grpc(e);
614                    self.handler.on_response_body_chunk(event).await
615                }
616                Some(grpc::agent_request::Event::RequestComplete(e)) => {
617                    let event = Self::convert_request_complete_from_grpc(e);
618                    self.handler.on_request_complete(event).await
619                }
620                Some(grpc::agent_request::Event::WebsocketFrame(e)) => {
621                    let event = Self::convert_websocket_frame_from_grpc(e);
622                    self.handler.on_websocket_frame(event).await
623                }
624                None => continue,
625            };
626
627            // If any event results in a block/redirect, that becomes the final response
628            if !matches!(response.decision, Decision::Allow) {
629                debug!(
630                    agent_id = %self.id,
631                    decision = ?response.decision,
632                    event_count = event_count,
633                    "Non-allow decision in stream, terminating early"
634                );
635                final_response = response;
636                break;
637            }
638            final_response = response;
639        }
640
641        trace!(
642            agent_id = %self.id,
643            event_count = event_count,
644            decision = ?final_response.decision,
645            "Stream processing complete"
646        );
647
648        let grpc_response = Self::convert_response_to_grpc(final_response);
649        Ok(Response::new(grpc_response))
650    }
651}
652
653impl GrpcAgentHandler {
654    /// Convert gRPC RequestHeadersEvent to internal format
655    fn convert_request_headers_from_grpc(e: grpc::RequestHeadersEvent) -> RequestHeadersEvent {
656        RequestHeadersEvent {
657            metadata: Self::convert_metadata_from_grpc(e.metadata),
658            method: e.method,
659            uri: e.uri,
660            headers: e.headers.into_iter().map(|(k, v)| (k, v.values)).collect(),
661        }
662    }
663
664    /// Convert gRPC RequestBodyChunkEvent to internal format
665    fn convert_request_body_chunk_from_grpc(
666        e: grpc::RequestBodyChunkEvent,
667    ) -> RequestBodyChunkEvent {
668        RequestBodyChunkEvent {
669            correlation_id: e.correlation_id,
670            data: String::from_utf8_lossy(&e.data).to_string(),
671            is_last: e.is_last,
672            total_size: e.total_size.map(|s| s as usize),
673            chunk_index: e.chunk_index,
674            bytes_received: e.bytes_received as usize,
675        }
676    }
677
678    /// Convert gRPC ResponseHeadersEvent to internal format
679    fn convert_response_headers_from_grpc(e: grpc::ResponseHeadersEvent) -> ResponseHeadersEvent {
680        ResponseHeadersEvent {
681            correlation_id: e.correlation_id,
682            status: e.status as u16,
683            headers: e.headers.into_iter().map(|(k, v)| (k, v.values)).collect(),
684        }
685    }
686
687    /// Convert gRPC ResponseBodyChunkEvent to internal format
688    fn convert_response_body_chunk_from_grpc(
689        e: grpc::ResponseBodyChunkEvent,
690    ) -> ResponseBodyChunkEvent {
691        ResponseBodyChunkEvent {
692            correlation_id: e.correlation_id,
693            data: String::from_utf8_lossy(&e.data).to_string(),
694            is_last: e.is_last,
695            total_size: e.total_size.map(|s| s as usize),
696            chunk_index: e.chunk_index,
697            bytes_sent: e.bytes_sent as usize,
698        }
699    }
700
701    /// Convert gRPC RequestCompleteEvent to internal format
702    fn convert_request_complete_from_grpc(e: grpc::RequestCompleteEvent) -> RequestCompleteEvent {
703        RequestCompleteEvent {
704            correlation_id: e.correlation_id,
705            status: e.status as u16,
706            duration_ms: e.duration_ms,
707            request_body_size: e.request_body_size as usize,
708            response_body_size: e.response_body_size as usize,
709            upstream_attempts: e.upstream_attempts,
710            error: e.error,
711        }
712    }
713
714    /// Convert gRPC WebSocketFrameEvent to internal format
715    fn convert_websocket_frame_from_grpc(e: grpc::WebSocketFrameEvent) -> WebSocketFrameEvent {
716        use base64::{engine::general_purpose::STANDARD, Engine as _};
717        WebSocketFrameEvent {
718            correlation_id: e.correlation_id,
719            opcode: e.opcode,
720            data: STANDARD.encode(&e.data),
721            client_to_server: e.client_to_server,
722            frame_index: e.frame_index,
723            fin: e.fin,
724            route_id: e.route_id,
725            client_ip: e.client_ip,
726        }
727    }
728
729    /// Convert gRPC metadata to internal format
730    fn convert_metadata_from_grpc(metadata: Option<grpc::RequestMetadata>) -> RequestMetadata {
731        match metadata {
732            Some(m) => RequestMetadata {
733                correlation_id: m.correlation_id,
734                request_id: m.request_id,
735                client_ip: m.client_ip,
736                client_port: m.client_port as u16,
737                server_name: m.server_name,
738                protocol: m.protocol,
739                tls_version: m.tls_version,
740                tls_cipher: m.tls_cipher,
741                route_id: m.route_id,
742                upstream_id: m.upstream_id,
743                timestamp: m.timestamp,
744                traceparent: m.traceparent,
745            },
746            None => RequestMetadata {
747                correlation_id: String::new(),
748                request_id: String::new(),
749                client_ip: String::new(),
750                client_port: 0,
751                server_name: None,
752                protocol: String::new(),
753                tls_version: None,
754                tls_cipher: None,
755                route_id: None,
756                upstream_id: None,
757                timestamp: String::new(),
758                traceparent: None,
759            },
760        }
761    }
762
763    /// Convert internal response to gRPC format
764    fn convert_response_to_grpc(response: AgentResponse) -> grpc::AgentResponse {
765        let decision = match response.decision {
766            Decision::Allow => Some(grpc::agent_response::Decision::Allow(
767                grpc::AllowDecision {},
768            )),
769            Decision::Block {
770                status,
771                body,
772                headers,
773            } => Some(grpc::agent_response::Decision::Block(grpc::BlockDecision {
774                status: status as u32,
775                body,
776                headers: headers.unwrap_or_default(),
777            })),
778            Decision::Redirect { url, status } => Some(grpc::agent_response::Decision::Redirect(
779                grpc::RedirectDecision {
780                    url,
781                    status: status as u32,
782                },
783            )),
784            Decision::Challenge {
785                challenge_type,
786                params,
787            } => Some(grpc::agent_response::Decision::Challenge(
788                grpc::ChallengeDecision {
789                    challenge_type,
790                    params,
791                },
792            )),
793        };
794
795        let request_headers: Vec<grpc::HeaderOp> = response
796            .request_headers
797            .into_iter()
798            .map(Self::convert_header_op_to_grpc)
799            .collect();
800
801        let response_headers: Vec<grpc::HeaderOp> = response
802            .response_headers
803            .into_iter()
804            .map(Self::convert_header_op_to_grpc)
805            .collect();
806
807        let audit = Some(grpc::AuditMetadata {
808            tags: response.audit.tags,
809            rule_ids: response.audit.rule_ids,
810            confidence: response.audit.confidence,
811            reason_codes: response.audit.reason_codes,
812            custom: response
813                .audit
814                .custom
815                .into_iter()
816                .map(|(k, v)| (k, v.to_string()))
817                .collect(),
818        });
819
820        // Convert body mutations
821        let request_body_mutation = response.request_body_mutation.map(|m| grpc::BodyMutation {
822            data: m.data.map(|d| d.into_bytes()),
823            chunk_index: m.chunk_index,
824        });
825
826        let response_body_mutation = response.response_body_mutation.map(|m| grpc::BodyMutation {
827            data: m.data.map(|d| d.into_bytes()),
828            chunk_index: m.chunk_index,
829        });
830
831        // Convert WebSocket decision
832        let websocket_decision = response
833            .websocket_decision
834            .map(|ws_decision| match ws_decision {
835                WebSocketDecision::Allow => {
836                    grpc::agent_response::WebsocketDecision::WebsocketAllow(
837                        grpc::WebSocketAllowDecision {},
838                    )
839                }
840                WebSocketDecision::Drop => grpc::agent_response::WebsocketDecision::WebsocketDrop(
841                    grpc::WebSocketDropDecision {},
842                ),
843                WebSocketDecision::Close { code, reason } => {
844                    grpc::agent_response::WebsocketDecision::WebsocketClose(
845                        grpc::WebSocketCloseDecision {
846                            code: code as u32,
847                            reason,
848                        },
849                    )
850                }
851            });
852
853        grpc::AgentResponse {
854            version: PROTOCOL_VERSION,
855            decision,
856            request_headers,
857            response_headers,
858            routing_metadata: response.routing_metadata,
859            audit,
860            needs_more: response.needs_more,
861            request_body_mutation,
862            response_body_mutation,
863            websocket_decision,
864        }
865    }
866
867    /// Convert internal header operation to gRPC format
868    fn convert_header_op_to_grpc(op: HeaderOp) -> grpc::HeaderOp {
869        let operation = match op {
870            HeaderOp::Set { name, value } => {
871                Some(grpc::header_op::Operation::Set(grpc::SetHeader {
872                    name,
873                    value,
874                }))
875            }
876            HeaderOp::Add { name, value } => {
877                Some(grpc::header_op::Operation::Add(grpc::AddHeader {
878                    name,
879                    value,
880                }))
881            }
882            HeaderOp::Remove { name } => {
883                Some(grpc::header_op::Operation::Remove(grpc::RemoveHeader {
884                    name,
885                }))
886            }
887        };
888        grpc::HeaderOp { operation }
889    }
890}