sentinel_agent_protocol/
server.rs

1//! Agent server for implementing external agents.
2//!
3//! Supports two transport mechanisms:
4//! - Unix domain sockets (length-prefixed JSON)
5//! - gRPC (Protocol Buffers over HTTP/2)
6
7use async_trait::async_trait;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use tokio::io::{AsyncReadExt, AsyncWriteExt};
11use tokio::net::{UnixListener, UnixStream};
12use tokio_stream::StreamExt;
13use tonic::{Request, Response, Status, Streaming};
14use tracing::{debug, error, info, trace, warn};
15
16use crate::errors::AgentProtocolError;
17use crate::grpc::{
18    self, agent_processor_server::AgentProcessor, agent_processor_server::AgentProcessorServer,
19};
20use crate::protocol::{
21    AgentRequest, AgentResponse, AuditMetadata, Decision, EventType, HeaderOp,
22    RequestBodyChunkEvent, RequestCompleteEvent, RequestHeadersEvent, RequestMetadata,
23    ResponseBodyChunkEvent, ResponseHeadersEvent, WebSocketDecision, WebSocketFrameEvent,
24    MAX_MESSAGE_SIZE, PROTOCOL_VERSION,
25};
26
27/// Agent server for testing and reference implementations
28pub struct AgentServer {
29    /// Agent ID
30    id: String,
31    /// Unix socket path
32    socket_path: std::path::PathBuf,
33    /// Request handler
34    handler: Arc<dyn AgentHandler>,
35}
36
37/// Trait for implementing agent logic
38#[async_trait]
39pub trait AgentHandler: Send + Sync {
40    /// Handle a request headers event
41    async fn on_request_headers(&self, _event: RequestHeadersEvent) -> AgentResponse {
42        AgentResponse::default_allow()
43    }
44
45    /// Handle a request body chunk event
46    async fn on_request_body_chunk(&self, _event: RequestBodyChunkEvent) -> AgentResponse {
47        AgentResponse::default_allow()
48    }
49
50    /// Handle a response headers event
51    async fn on_response_headers(&self, _event: ResponseHeadersEvent) -> AgentResponse {
52        AgentResponse::default_allow()
53    }
54
55    /// Handle a response body chunk event
56    async fn on_response_body_chunk(&self, _event: ResponseBodyChunkEvent) -> AgentResponse {
57        AgentResponse::default_allow()
58    }
59
60    /// Handle a request complete event
61    async fn on_request_complete(&self, _event: RequestCompleteEvent) -> AgentResponse {
62        AgentResponse::default_allow()
63    }
64
65    /// Handle a WebSocket frame event
66    ///
67    /// Called for each WebSocket frame when inspection is enabled.
68    /// Return `AgentResponse::websocket_allow()` to forward the frame,
69    /// `AgentResponse::websocket_drop()` to silently drop it, or
70    /// `AgentResponse::websocket_close(code, reason)` to close the connection.
71    async fn on_websocket_frame(&self, _event: WebSocketFrameEvent) -> AgentResponse {
72        AgentResponse::websocket_allow()
73    }
74}
75
76impl AgentServer {
77    /// Create a new agent server
78    pub fn new(
79        id: impl Into<String>,
80        socket_path: impl Into<std::path::PathBuf>,
81        handler: Box<dyn AgentHandler>,
82    ) -> Self {
83        let id = id.into();
84        let socket_path = socket_path.into();
85
86        debug!(
87            agent_id = %id,
88            socket_path = %socket_path.display(),
89            "Creating agent server"
90        );
91
92        Self {
93            id,
94            socket_path,
95            handler: Arc::from(handler),
96        }
97    }
98
99    /// Start the agent server
100    pub async fn run(&self) -> Result<(), AgentProtocolError> {
101        // Remove existing socket file if it exists
102        if self.socket_path.exists() {
103            trace!(
104                agent_id = %self.id,
105                socket_path = %self.socket_path.display(),
106                "Removing existing socket file"
107            );
108            std::fs::remove_file(&self.socket_path)?;
109        }
110
111        // Create Unix socket listener
112        let listener = UnixListener::bind(&self.socket_path)?;
113
114        info!(
115            agent_id = %self.id,
116            socket_path = %self.socket_path.display(),
117            "Agent server listening"
118        );
119
120        loop {
121            match listener.accept().await {
122                Ok((stream, _addr)) => {
123                    trace!(
124                        agent_id = %self.id,
125                        "Accepted new connection"
126                    );
127                    let handler = Arc::clone(&self.handler);
128                    let agent_id = self.id.clone();
129                    tokio::spawn(async move {
130                        if let Err(e) = Self::handle_connection(stream, handler.as_ref()).await {
131                            error!(
132                                agent_id = %agent_id,
133                                error = %e,
134                                "Error handling agent connection"
135                            );
136                        }
137                    });
138                }
139                Err(e) => {
140                    error!(
141                        agent_id = %self.id,
142                        error = %e,
143                        "Failed to accept connection"
144                    );
145                }
146            }
147        }
148    }
149
150    /// Handle a single connection
151    async fn handle_connection(
152        mut stream: UnixStream,
153        handler: &dyn AgentHandler,
154    ) -> Result<(), AgentProtocolError> {
155        trace!("Starting connection handler");
156
157        loop {
158            // Read message length
159            let mut len_bytes = [0u8; 4];
160            match stream.read_exact(&mut len_bytes).await {
161                Ok(_) => {}
162                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
163                    // Client disconnected
164                    trace!("Client disconnected (EOF)");
165                    return Ok(());
166                }
167                Err(e) => {
168                    error!(error = %e, "Error reading message length");
169                    return Err(e.into());
170                }
171            }
172
173            let message_len = u32::from_be_bytes(len_bytes) as usize;
174
175            // Check message size
176            if message_len > MAX_MESSAGE_SIZE {
177                warn!(
178                    message_len = message_len,
179                    max_size = MAX_MESSAGE_SIZE,
180                    "Message too large"
181                );
182                return Err(AgentProtocolError::MessageTooLarge {
183                    size: message_len,
184                    max: MAX_MESSAGE_SIZE,
185                });
186            }
187
188            trace!(message_len = message_len, "Reading message data");
189
190            // Read message data
191            let mut buffer = vec![0u8; message_len];
192            stream.read_exact(&mut buffer).await?;
193
194            // Parse request
195            let request: AgentRequest = serde_json::from_slice(&buffer)
196                .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
197
198            trace!(
199                event_type = ?request.event_type,
200                version = request.version,
201                "Received agent request"
202            );
203
204            // Handle request based on event type
205            let response = match request.event_type {
206                EventType::RequestHeaders => {
207                    let event: RequestHeadersEvent = serde_json::from_value(request.payload)
208                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
209                    trace!(
210                        correlation_id = %event.metadata.correlation_id,
211                        method = %event.method,
212                        uri = %event.uri,
213                        "Processing request_headers event"
214                    );
215                    handler.on_request_headers(event).await
216                }
217                EventType::RequestBodyChunk => {
218                    let event: RequestBodyChunkEvent = serde_json::from_value(request.payload)
219                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
220                    trace!(
221                        correlation_id = %event.correlation_id,
222                        is_last = event.is_last,
223                        data_len = event.data.len(),
224                        "Processing request_body_chunk event"
225                    );
226                    handler.on_request_body_chunk(event).await
227                }
228                EventType::ResponseHeaders => {
229                    let event: ResponseHeadersEvent = serde_json::from_value(request.payload)
230                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
231                    trace!(
232                        correlation_id = %event.correlation_id,
233                        status = event.status,
234                        "Processing response_headers event"
235                    );
236                    handler.on_response_headers(event).await
237                }
238                EventType::ResponseBodyChunk => {
239                    let event: ResponseBodyChunkEvent = serde_json::from_value(request.payload)
240                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
241                    trace!(
242                        correlation_id = %event.correlation_id,
243                        is_last = event.is_last,
244                        data_len = event.data.len(),
245                        "Processing response_body_chunk event"
246                    );
247                    handler.on_response_body_chunk(event).await
248                }
249                EventType::RequestComplete => {
250                    let event: RequestCompleteEvent = serde_json::from_value(request.payload)
251                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
252                    trace!(
253                        correlation_id = %event.correlation_id,
254                        status = event.status,
255                        duration_ms = event.duration_ms,
256                        "Processing request_complete event"
257                    );
258                    handler.on_request_complete(event).await
259                }
260                EventType::WebSocketFrame => {
261                    let event: WebSocketFrameEvent = serde_json::from_value(request.payload)
262                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
263                    trace!(
264                        correlation_id = %event.correlation_id,
265                        opcode = %event.opcode,
266                        frame_index = event.frame_index,
267                        client_to_server = event.client_to_server,
268                        "Processing websocket_frame event"
269                    );
270                    handler.on_websocket_frame(event).await
271                }
272            };
273
274            trace!(
275                decision = ?response.decision,
276                "Sending agent response"
277            );
278
279            // Send response
280            let response_bytes = serde_json::to_vec(&response)
281                .map_err(|e| AgentProtocolError::Serialization(e.to_string()))?;
282
283            // Write message length
284            let len_bytes = (response_bytes.len() as u32).to_be_bytes();
285            stream.write_all(&len_bytes).await?;
286            // Write message data
287            stream.write_all(&response_bytes).await?;
288            stream.flush().await?;
289
290            trace!(response_len = response_bytes.len(), "Response sent");
291        }
292    }
293}
294
295/// Reference implementation: Echo agent (for testing)
296pub struct EchoAgent;
297
298#[async_trait]
299impl AgentHandler for EchoAgent {
300    async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
301        debug!(
302            "Echo agent: request headers for {}",
303            event.metadata.correlation_id
304        );
305
306        // Echo back correlation ID as a header
307        AgentResponse::default_allow()
308            .add_request_header(HeaderOp::Set {
309                name: "X-Echo-Agent".to_string(),
310                value: event.metadata.correlation_id.clone(),
311            })
312            .with_audit(AuditMetadata {
313                tags: vec!["echo".to_string()],
314                ..Default::default()
315            })
316    }
317}
318
319/// Reference implementation: Denylist agent
320pub struct DenylistAgent {
321    blocked_paths: Vec<String>,
322    blocked_ips: Vec<String>,
323}
324
325impl DenylistAgent {
326    pub fn new(blocked_paths: Vec<String>, blocked_ips: Vec<String>) -> Self {
327        Self {
328            blocked_paths,
329            blocked_ips,
330        }
331    }
332}
333
334#[async_trait]
335impl AgentHandler for DenylistAgent {
336    async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
337        trace!(
338            correlation_id = %event.metadata.correlation_id,
339            uri = %event.uri,
340            client_ip = %event.metadata.client_ip,
341            "Denylist agent checking request"
342        );
343
344        // Check if path is blocked
345        for blocked_path in &self.blocked_paths {
346            if event.uri.starts_with(blocked_path) {
347                debug!(
348                    correlation_id = %event.metadata.correlation_id,
349                    blocked_path = %blocked_path,
350                    uri = %event.uri,
351                    "Blocking request: path matched denylist"
352                );
353                return AgentResponse::block(403, Some("Forbidden path".to_string())).with_audit(
354                    AuditMetadata {
355                        tags: vec!["denylist".to_string(), "blocked_path".to_string()],
356                        reason_codes: vec!["PATH_BLOCKED".to_string()],
357                        ..Default::default()
358                    },
359                );
360            }
361        }
362
363        // Check if IP is blocked
364        if self.blocked_ips.contains(&event.metadata.client_ip) {
365            debug!(
366                correlation_id = %event.metadata.correlation_id,
367                client_ip = %event.metadata.client_ip,
368                "Blocking request: IP matched denylist"
369            );
370            return AgentResponse::block(403, Some("Forbidden IP".to_string())).with_audit(
371                AuditMetadata {
372                    tags: vec!["denylist".to_string(), "blocked_ip".to_string()],
373                    reason_codes: vec!["IP_BLOCKED".to_string()],
374                    ..Default::default()
375                },
376            );
377        }
378
379        trace!(
380            correlation_id = %event.metadata.correlation_id,
381            "Request allowed by denylist agent"
382        );
383        AgentResponse::default_allow()
384    }
385}
386
387// ============================================================================
388// gRPC Server Implementation
389// ============================================================================
390
391/// gRPC agent server for implementing external agents
392pub struct GrpcAgentServer {
393    /// Agent ID
394    id: String,
395    /// Request handler
396    handler: Arc<dyn AgentHandler>,
397}
398
399impl GrpcAgentServer {
400    /// Create a new gRPC agent server
401    pub fn new(id: impl Into<String>, handler: Box<dyn AgentHandler>) -> Self {
402        let id = id.into();
403        debug!(agent_id = %id, "Creating gRPC agent server");
404        Self {
405            id,
406            handler: Arc::from(handler),
407        }
408    }
409
410    /// Get the tonic service for this agent
411    pub fn into_service(self) -> AgentProcessorServer<GrpcAgentHandler> {
412        trace!(agent_id = %self.id, "Converting to tonic service");
413        AgentProcessorServer::new(GrpcAgentHandler {
414            id: self.id,
415            handler: self.handler,
416        })
417    }
418
419    /// Start the gRPC server on the given address
420    pub async fn run(self, addr: SocketAddr) -> Result<(), AgentProtocolError> {
421        info!(
422            agent_id = %self.id,
423            address = %addr,
424            "gRPC agent server listening"
425        );
426
427        tonic::transport::Server::builder()
428            .add_service(self.into_service())
429            .serve(addr)
430            .await
431            .map_err(|e| {
432                error!(error = %e, "gRPC server error");
433                AgentProtocolError::ConnectionFailed(format!("gRPC server error: {}", e))
434            })
435    }
436}
437
438/// Internal handler that implements the gRPC AgentProcessor trait
439pub struct GrpcAgentHandler {
440    id: String,
441    handler: Arc<dyn AgentHandler>,
442}
443
444#[tonic::async_trait]
445impl AgentProcessor for GrpcAgentHandler {
446    async fn process_event(
447        &self,
448        request: Request<grpc::AgentRequest>,
449    ) -> Result<Response<grpc::AgentResponse>, Status> {
450        let grpc_request = request.into_inner();
451
452        trace!(
453            agent_id = %self.id,
454            event_type = grpc_request.event_type,
455            version = grpc_request.version,
456            "Processing gRPC event"
457        );
458
459        // Convert gRPC event to internal event and dispatch
460        let response = match grpc_request.event {
461            Some(grpc::agent_request::Event::RequestHeaders(e)) => {
462                let event = Self::convert_request_headers_from_grpc(e);
463                trace!(
464                    agent_id = %self.id,
465                    correlation_id = %event.metadata.correlation_id,
466                    "Processing request_headers via gRPC"
467                );
468                self.handler.on_request_headers(event).await
469            }
470            Some(grpc::agent_request::Event::RequestBodyChunk(e)) => {
471                let event = Self::convert_request_body_chunk_from_grpc(e);
472                trace!(
473                    agent_id = %self.id,
474                    correlation_id = %event.correlation_id,
475                    "Processing request_body_chunk via gRPC"
476                );
477                self.handler.on_request_body_chunk(event).await
478            }
479            Some(grpc::agent_request::Event::ResponseHeaders(e)) => {
480                let event = Self::convert_response_headers_from_grpc(e);
481                trace!(
482                    agent_id = %self.id,
483                    correlation_id = %event.correlation_id,
484                    "Processing response_headers via gRPC"
485                );
486                self.handler.on_response_headers(event).await
487            }
488            Some(grpc::agent_request::Event::ResponseBodyChunk(e)) => {
489                let event = Self::convert_response_body_chunk_from_grpc(e);
490                trace!(
491                    agent_id = %self.id,
492                    correlation_id = %event.correlation_id,
493                    "Processing response_body_chunk via gRPC"
494                );
495                self.handler.on_response_body_chunk(event).await
496            }
497            Some(grpc::agent_request::Event::RequestComplete(e)) => {
498                let event = Self::convert_request_complete_from_grpc(e);
499                trace!(
500                    agent_id = %self.id,
501                    correlation_id = %event.correlation_id,
502                    "Processing request_complete via gRPC"
503                );
504                self.handler.on_request_complete(event).await
505            }
506            Some(grpc::agent_request::Event::WebsocketFrame(e)) => {
507                let event = Self::convert_websocket_frame_from_grpc(e);
508                trace!(
509                    agent_id = %self.id,
510                    correlation_id = %event.correlation_id,
511                    opcode = %event.opcode,
512                    "Processing websocket_frame via gRPC"
513                );
514                self.handler.on_websocket_frame(event).await
515            }
516            None => {
517                warn!(agent_id = %self.id, "Missing event in gRPC request");
518                return Err(Status::invalid_argument("Missing event in request"));
519            }
520        };
521
522        trace!(
523            agent_id = %self.id,
524            decision = ?response.decision,
525            "Returning gRPC response"
526        );
527
528        // Convert internal response to gRPC response
529        let grpc_response = Self::convert_response_to_grpc(response);
530        Ok(Response::new(grpc_response))
531    }
532
533    async fn process_event_stream(
534        &self,
535        request: Request<Streaming<grpc::AgentRequest>>,
536    ) -> Result<Response<grpc::AgentResponse>, Status> {
537        let mut stream = request.into_inner();
538
539        trace!(agent_id = %self.id, "Processing gRPC event stream");
540
541        // Process all events in the stream, returning the final response
542        let mut final_response = AgentResponse::default_allow();
543        let mut event_count = 0u32;
544
545        while let Some(result) = stream.next().await {
546            let grpc_request = result.map_err(|e| {
547                error!(agent_id = %self.id, error = %e, "Stream error");
548                Status::internal(format!("Stream error: {}", e))
549            })?;
550
551            event_count += 1;
552            trace!(
553                agent_id = %self.id,
554                event_count = event_count,
555                "Processing stream event"
556            );
557
558            let response = match grpc_request.event {
559                Some(grpc::agent_request::Event::RequestHeaders(e)) => {
560                    let event = Self::convert_request_headers_from_grpc(e);
561                    self.handler.on_request_headers(event).await
562                }
563                Some(grpc::agent_request::Event::RequestBodyChunk(e)) => {
564                    let event = Self::convert_request_body_chunk_from_grpc(e);
565                    self.handler.on_request_body_chunk(event).await
566                }
567                Some(grpc::agent_request::Event::ResponseHeaders(e)) => {
568                    let event = Self::convert_response_headers_from_grpc(e);
569                    self.handler.on_response_headers(event).await
570                }
571                Some(grpc::agent_request::Event::ResponseBodyChunk(e)) => {
572                    let event = Self::convert_response_body_chunk_from_grpc(e);
573                    self.handler.on_response_body_chunk(event).await
574                }
575                Some(grpc::agent_request::Event::RequestComplete(e)) => {
576                    let event = Self::convert_request_complete_from_grpc(e);
577                    self.handler.on_request_complete(event).await
578                }
579                Some(grpc::agent_request::Event::WebsocketFrame(e)) => {
580                    let event = Self::convert_websocket_frame_from_grpc(e);
581                    self.handler.on_websocket_frame(event).await
582                }
583                None => continue,
584            };
585
586            // If any event results in a block/redirect, that becomes the final response
587            if !matches!(response.decision, Decision::Allow) {
588                debug!(
589                    agent_id = %self.id,
590                    decision = ?response.decision,
591                    event_count = event_count,
592                    "Non-allow decision in stream, terminating early"
593                );
594                final_response = response;
595                break;
596            }
597            final_response = response;
598        }
599
600        trace!(
601            agent_id = %self.id,
602            event_count = event_count,
603            decision = ?final_response.decision,
604            "Stream processing complete"
605        );
606
607        let grpc_response = Self::convert_response_to_grpc(final_response);
608        Ok(Response::new(grpc_response))
609    }
610}
611
612impl GrpcAgentHandler {
613    /// Convert gRPC RequestHeadersEvent to internal format
614    fn convert_request_headers_from_grpc(e: grpc::RequestHeadersEvent) -> RequestHeadersEvent {
615        RequestHeadersEvent {
616            metadata: Self::convert_metadata_from_grpc(e.metadata),
617            method: e.method,
618            uri: e.uri,
619            headers: e.headers.into_iter().map(|(k, v)| (k, v.values)).collect(),
620        }
621    }
622
623    /// Convert gRPC RequestBodyChunkEvent to internal format
624    fn convert_request_body_chunk_from_grpc(
625        e: grpc::RequestBodyChunkEvent,
626    ) -> RequestBodyChunkEvent {
627        RequestBodyChunkEvent {
628            correlation_id: e.correlation_id,
629            data: String::from_utf8_lossy(&e.data).to_string(),
630            is_last: e.is_last,
631            total_size: e.total_size.map(|s| s as usize),
632            chunk_index: e.chunk_index,
633            bytes_received: e.bytes_received as usize,
634        }
635    }
636
637    /// Convert gRPC ResponseHeadersEvent to internal format
638    fn convert_response_headers_from_grpc(e: grpc::ResponseHeadersEvent) -> ResponseHeadersEvent {
639        ResponseHeadersEvent {
640            correlation_id: e.correlation_id,
641            status: e.status as u16,
642            headers: e.headers.into_iter().map(|(k, v)| (k, v.values)).collect(),
643        }
644    }
645
646    /// Convert gRPC ResponseBodyChunkEvent to internal format
647    fn convert_response_body_chunk_from_grpc(
648        e: grpc::ResponseBodyChunkEvent,
649    ) -> ResponseBodyChunkEvent {
650        ResponseBodyChunkEvent {
651            correlation_id: e.correlation_id,
652            data: String::from_utf8_lossy(&e.data).to_string(),
653            is_last: e.is_last,
654            total_size: e.total_size.map(|s| s as usize),
655            chunk_index: e.chunk_index,
656            bytes_sent: e.bytes_sent as usize,
657        }
658    }
659
660    /// Convert gRPC RequestCompleteEvent to internal format
661    fn convert_request_complete_from_grpc(e: grpc::RequestCompleteEvent) -> RequestCompleteEvent {
662        RequestCompleteEvent {
663            correlation_id: e.correlation_id,
664            status: e.status as u16,
665            duration_ms: e.duration_ms,
666            request_body_size: e.request_body_size as usize,
667            response_body_size: e.response_body_size as usize,
668            upstream_attempts: e.upstream_attempts,
669            error: e.error,
670        }
671    }
672
673    /// Convert gRPC WebSocketFrameEvent to internal format
674    fn convert_websocket_frame_from_grpc(e: grpc::WebSocketFrameEvent) -> WebSocketFrameEvent {
675        use base64::{engine::general_purpose::STANDARD, Engine as _};
676        WebSocketFrameEvent {
677            correlation_id: e.correlation_id,
678            opcode: e.opcode,
679            data: STANDARD.encode(&e.data),
680            client_to_server: e.client_to_server,
681            frame_index: e.frame_index,
682            fin: e.fin,
683            route_id: e.route_id,
684            client_ip: e.client_ip,
685        }
686    }
687
688    /// Convert gRPC metadata to internal format
689    fn convert_metadata_from_grpc(metadata: Option<grpc::RequestMetadata>) -> RequestMetadata {
690        match metadata {
691            Some(m) => RequestMetadata {
692                correlation_id: m.correlation_id,
693                request_id: m.request_id,
694                client_ip: m.client_ip,
695                client_port: m.client_port as u16,
696                server_name: m.server_name,
697                protocol: m.protocol,
698                tls_version: m.tls_version,
699                tls_cipher: m.tls_cipher,
700                route_id: m.route_id,
701                upstream_id: m.upstream_id,
702                timestamp: m.timestamp,
703            },
704            None => RequestMetadata {
705                correlation_id: String::new(),
706                request_id: String::new(),
707                client_ip: String::new(),
708                client_port: 0,
709                server_name: None,
710                protocol: String::new(),
711                tls_version: None,
712                tls_cipher: None,
713                route_id: None,
714                upstream_id: None,
715                timestamp: String::new(),
716            },
717        }
718    }
719
720    /// Convert internal response to gRPC format
721    fn convert_response_to_grpc(response: AgentResponse) -> grpc::AgentResponse {
722        let decision = match response.decision {
723            Decision::Allow => Some(grpc::agent_response::Decision::Allow(
724                grpc::AllowDecision {},
725            )),
726            Decision::Block {
727                status,
728                body,
729                headers,
730            } => Some(grpc::agent_response::Decision::Block(grpc::BlockDecision {
731                status: status as u32,
732                body,
733                headers: headers.unwrap_or_default(),
734            })),
735            Decision::Redirect { url, status } => Some(grpc::agent_response::Decision::Redirect(
736                grpc::RedirectDecision {
737                    url,
738                    status: status as u32,
739                },
740            )),
741            Decision::Challenge {
742                challenge_type,
743                params,
744            } => Some(grpc::agent_response::Decision::Challenge(
745                grpc::ChallengeDecision {
746                    challenge_type,
747                    params,
748                },
749            )),
750        };
751
752        let request_headers: Vec<grpc::HeaderOp> = response
753            .request_headers
754            .into_iter()
755            .map(Self::convert_header_op_to_grpc)
756            .collect();
757
758        let response_headers: Vec<grpc::HeaderOp> = response
759            .response_headers
760            .into_iter()
761            .map(Self::convert_header_op_to_grpc)
762            .collect();
763
764        let audit = Some(grpc::AuditMetadata {
765            tags: response.audit.tags,
766            rule_ids: response.audit.rule_ids,
767            confidence: response.audit.confidence,
768            reason_codes: response.audit.reason_codes,
769            custom: response
770                .audit
771                .custom
772                .into_iter()
773                .map(|(k, v)| (k, v.to_string()))
774                .collect(),
775        });
776
777        // Convert body mutations
778        let request_body_mutation = response.request_body_mutation.map(|m| grpc::BodyMutation {
779            data: m.data.map(|d| d.into_bytes()),
780            chunk_index: m.chunk_index,
781        });
782
783        let response_body_mutation = response.response_body_mutation.map(|m| grpc::BodyMutation {
784            data: m.data.map(|d| d.into_bytes()),
785            chunk_index: m.chunk_index,
786        });
787
788        // Convert WebSocket decision
789        let websocket_decision = response
790            .websocket_decision
791            .map(|ws_decision| match ws_decision {
792                WebSocketDecision::Allow => {
793                    grpc::agent_response::WebsocketDecision::WebsocketAllow(
794                        grpc::WebSocketAllowDecision {},
795                    )
796                }
797                WebSocketDecision::Drop => grpc::agent_response::WebsocketDecision::WebsocketDrop(
798                    grpc::WebSocketDropDecision {},
799                ),
800                WebSocketDecision::Close { code, reason } => {
801                    grpc::agent_response::WebsocketDecision::WebsocketClose(
802                        grpc::WebSocketCloseDecision {
803                            code: code as u32,
804                            reason,
805                        },
806                    )
807                }
808            });
809
810        grpc::AgentResponse {
811            version: PROTOCOL_VERSION,
812            decision,
813            request_headers,
814            response_headers,
815            routing_metadata: response.routing_metadata,
816            audit,
817            needs_more: response.needs_more,
818            request_body_mutation,
819            response_body_mutation,
820            websocket_decision,
821        }
822    }
823
824    /// Convert internal header operation to gRPC format
825    fn convert_header_op_to_grpc(op: HeaderOp) -> grpc::HeaderOp {
826        let operation = match op {
827            HeaderOp::Set { name, value } => {
828                Some(grpc::header_op::Operation::Set(grpc::SetHeader {
829                    name,
830                    value,
831                }))
832            }
833            HeaderOp::Add { name, value } => {
834                Some(grpc::header_op::Operation::Add(grpc::AddHeader {
835                    name,
836                    value,
837                }))
838            }
839            HeaderOp::Remove { name } => {
840                Some(grpc::header_op::Operation::Remove(grpc::RemoveHeader {
841                    name,
842                }))
843            }
844        };
845        grpc::HeaderOp { operation }
846    }
847}