sentinel_agent_protocol/
server.rs

1//! Agent server for implementing external agents.
2//!
3//! Supports two transport mechanisms:
4//! - Unix domain sockets (length-prefixed JSON)
5//! - gRPC (Protocol Buffers over HTTP/2)
6
7use async_trait::async_trait;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use tokio::io::{AsyncReadExt, AsyncWriteExt};
11use tokio::net::{UnixListener, UnixStream};
12use tokio_stream::StreamExt;
13use tonic::{Request, Response, Status, Streaming};
14use tracing::{debug, error, info, trace, warn};
15
16use crate::errors::AgentProtocolError;
17use crate::grpc::{
18    self, agent_processor_server::AgentProcessor, agent_processor_server::AgentProcessorServer,
19};
20use crate::protocol::{
21    AgentRequest, AgentResponse, AuditMetadata, ConfigureEvent, Decision, EventType, HeaderOp,
22    RequestBodyChunkEvent, RequestCompleteEvent, RequestHeadersEvent, RequestMetadata,
23    ResponseBodyChunkEvent, ResponseHeadersEvent, WebSocketDecision, WebSocketFrameEvent,
24    MAX_MESSAGE_SIZE, PROTOCOL_VERSION,
25};
26
27/// Agent server for testing and reference implementations
28pub struct AgentServer {
29    /// Agent ID
30    id: String,
31    /// Unix socket path
32    socket_path: std::path::PathBuf,
33    /// Request handler
34    handler: Arc<dyn AgentHandler>,
35}
36
37/// Trait for implementing agent logic
38#[async_trait]
39pub trait AgentHandler: Send + Sync {
40    /// Handle a configure event
41    ///
42    /// Called once when the agent connects, before any request events.
43    /// Use this to receive agent-specific configuration from the proxy.
44    ///
45    /// The default implementation accepts any configuration silently.
46    /// Override this to parse and validate your agent's configuration.
47    async fn on_configure(&self, _event: ConfigureEvent) -> AgentResponse {
48        AgentResponse::default_allow()
49    }
50
51    /// Handle a request headers event
52    async fn on_request_headers(&self, _event: RequestHeadersEvent) -> AgentResponse {
53        AgentResponse::default_allow()
54    }
55
56    /// Handle a request body chunk event
57    async fn on_request_body_chunk(&self, _event: RequestBodyChunkEvent) -> AgentResponse {
58        AgentResponse::default_allow()
59    }
60
61    /// Handle a response headers event
62    async fn on_response_headers(&self, _event: ResponseHeadersEvent) -> AgentResponse {
63        AgentResponse::default_allow()
64    }
65
66    /// Handle a response body chunk event
67    async fn on_response_body_chunk(&self, _event: ResponseBodyChunkEvent) -> AgentResponse {
68        AgentResponse::default_allow()
69    }
70
71    /// Handle a request complete event
72    async fn on_request_complete(&self, _event: RequestCompleteEvent) -> AgentResponse {
73        AgentResponse::default_allow()
74    }
75
76    /// Handle a WebSocket frame event
77    ///
78    /// Called for each WebSocket frame when inspection is enabled.
79    /// Return `AgentResponse::websocket_allow()` to forward the frame,
80    /// `AgentResponse::websocket_drop()` to silently drop it, or
81    /// `AgentResponse::websocket_close(code, reason)` to close the connection.
82    async fn on_websocket_frame(&self, _event: WebSocketFrameEvent) -> AgentResponse {
83        AgentResponse::websocket_allow()
84    }
85}
86
87impl AgentServer {
88    /// Create a new agent server
89    pub fn new(
90        id: impl Into<String>,
91        socket_path: impl Into<std::path::PathBuf>,
92        handler: Box<dyn AgentHandler>,
93    ) -> Self {
94        let id = id.into();
95        let socket_path = socket_path.into();
96
97        debug!(
98            agent_id = %id,
99            socket_path = %socket_path.display(),
100            "Creating agent server"
101        );
102
103        Self {
104            id,
105            socket_path,
106            handler: Arc::from(handler),
107        }
108    }
109
110    /// Start the agent server
111    pub async fn run(&self) -> Result<(), AgentProtocolError> {
112        // Remove existing socket file if it exists
113        if self.socket_path.exists() {
114            trace!(
115                agent_id = %self.id,
116                socket_path = %self.socket_path.display(),
117                "Removing existing socket file"
118            );
119            std::fs::remove_file(&self.socket_path)?;
120        }
121
122        // Create Unix socket listener
123        let listener = UnixListener::bind(&self.socket_path)?;
124
125        info!(
126            agent_id = %self.id,
127            socket_path = %self.socket_path.display(),
128            "Agent server listening"
129        );
130
131        loop {
132            match listener.accept().await {
133                Ok((stream, _addr)) => {
134                    trace!(
135                        agent_id = %self.id,
136                        "Accepted new connection"
137                    );
138                    let handler = Arc::clone(&self.handler);
139                    let agent_id = self.id.clone();
140                    tokio::spawn(async move {
141                        if let Err(e) = Self::handle_connection(stream, handler.as_ref()).await {
142                            error!(
143                                agent_id = %agent_id,
144                                error = %e,
145                                "Error handling agent connection"
146                            );
147                        }
148                    });
149                }
150                Err(e) => {
151                    error!(
152                        agent_id = %self.id,
153                        error = %e,
154                        "Failed to accept connection"
155                    );
156                }
157            }
158        }
159    }
160
161    /// Handle a single connection
162    async fn handle_connection(
163        mut stream: UnixStream,
164        handler: &dyn AgentHandler,
165    ) -> Result<(), AgentProtocolError> {
166        trace!("Starting connection handler");
167
168        loop {
169            // Read message length
170            let mut len_bytes = [0u8; 4];
171            match stream.read_exact(&mut len_bytes).await {
172                Ok(_) => {}
173                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
174                    // Client disconnected
175                    trace!("Client disconnected (EOF)");
176                    return Ok(());
177                }
178                Err(e) => {
179                    error!(error = %e, "Error reading message length");
180                    return Err(e.into());
181                }
182            }
183
184            let message_len = u32::from_be_bytes(len_bytes) as usize;
185
186            // Check message size
187            if message_len > MAX_MESSAGE_SIZE {
188                warn!(
189                    message_len = message_len,
190                    max_size = MAX_MESSAGE_SIZE,
191                    "Message too large"
192                );
193                return Err(AgentProtocolError::MessageTooLarge {
194                    size: message_len,
195                    max: MAX_MESSAGE_SIZE,
196                });
197            }
198
199            trace!(message_len = message_len, "Reading message data");
200
201            // Read message data
202            let mut buffer = vec![0u8; message_len];
203            stream.read_exact(&mut buffer).await?;
204
205            // Parse request
206            let request: AgentRequest = serde_json::from_slice(&buffer)
207                .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
208
209            trace!(
210                event_type = ?request.event_type,
211                version = request.version,
212                "Received agent request"
213            );
214
215            // Handle request based on event type
216            let response = match request.event_type {
217                EventType::Configure => {
218                    let event: ConfigureEvent = serde_json::from_value(request.payload)
219                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
220                    trace!(
221                        agent_id = %event.agent_id,
222                        "Processing configure event"
223                    );
224                    handler.on_configure(event).await
225                }
226                EventType::RequestHeaders => {
227                    let event: RequestHeadersEvent = serde_json::from_value(request.payload)
228                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
229                    trace!(
230                        correlation_id = %event.metadata.correlation_id,
231                        method = %event.method,
232                        uri = %event.uri,
233                        "Processing request_headers event"
234                    );
235                    handler.on_request_headers(event).await
236                }
237                EventType::RequestBodyChunk => {
238                    let event: RequestBodyChunkEvent = serde_json::from_value(request.payload)
239                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
240                    trace!(
241                        correlation_id = %event.correlation_id,
242                        is_last = event.is_last,
243                        data_len = event.data.len(),
244                        "Processing request_body_chunk event"
245                    );
246                    handler.on_request_body_chunk(event).await
247                }
248                EventType::ResponseHeaders => {
249                    let event: ResponseHeadersEvent = serde_json::from_value(request.payload)
250                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
251                    trace!(
252                        correlation_id = %event.correlation_id,
253                        status = event.status,
254                        "Processing response_headers event"
255                    );
256                    handler.on_response_headers(event).await
257                }
258                EventType::ResponseBodyChunk => {
259                    let event: ResponseBodyChunkEvent = serde_json::from_value(request.payload)
260                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
261                    trace!(
262                        correlation_id = %event.correlation_id,
263                        is_last = event.is_last,
264                        data_len = event.data.len(),
265                        "Processing response_body_chunk event"
266                    );
267                    handler.on_response_body_chunk(event).await
268                }
269                EventType::RequestComplete => {
270                    let event: RequestCompleteEvent = serde_json::from_value(request.payload)
271                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
272                    trace!(
273                        correlation_id = %event.correlation_id,
274                        status = event.status,
275                        duration_ms = event.duration_ms,
276                        "Processing request_complete event"
277                    );
278                    handler.on_request_complete(event).await
279                }
280                EventType::WebSocketFrame => {
281                    let event: WebSocketFrameEvent = serde_json::from_value(request.payload)
282                        .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
283                    trace!(
284                        correlation_id = %event.correlation_id,
285                        opcode = %event.opcode,
286                        frame_index = event.frame_index,
287                        client_to_server = event.client_to_server,
288                        "Processing websocket_frame event"
289                    );
290                    handler.on_websocket_frame(event).await
291                }
292            };
293
294            trace!(
295                decision = ?response.decision,
296                "Sending agent response"
297            );
298
299            // Send response
300            let response_bytes = serde_json::to_vec(&response)
301                .map_err(|e| AgentProtocolError::Serialization(e.to_string()))?;
302
303            // Write message length
304            let len_bytes = (response_bytes.len() as u32).to_be_bytes();
305            stream.write_all(&len_bytes).await?;
306            // Write message data
307            stream.write_all(&response_bytes).await?;
308            stream.flush().await?;
309
310            trace!(response_len = response_bytes.len(), "Response sent");
311        }
312    }
313}
314
315/// Reference implementation: Echo agent (for testing)
316pub struct EchoAgent;
317
318#[async_trait]
319impl AgentHandler for EchoAgent {
320    async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
321        debug!(
322            "Echo agent: request headers for {}",
323            event.metadata.correlation_id
324        );
325
326        // Echo back correlation ID as a header
327        AgentResponse::default_allow()
328            .add_request_header(HeaderOp::Set {
329                name: "X-Echo-Agent".to_string(),
330                value: event.metadata.correlation_id.clone(),
331            })
332            .with_audit(AuditMetadata {
333                tags: vec!["echo".to_string()],
334                ..Default::default()
335            })
336    }
337}
338
339/// Reference implementation: Denylist agent
340pub struct DenylistAgent {
341    blocked_paths: Vec<String>,
342    blocked_ips: Vec<String>,
343}
344
345impl DenylistAgent {
346    pub fn new(blocked_paths: Vec<String>, blocked_ips: Vec<String>) -> Self {
347        Self {
348            blocked_paths,
349            blocked_ips,
350        }
351    }
352}
353
354#[async_trait]
355impl AgentHandler for DenylistAgent {
356    async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
357        trace!(
358            correlation_id = %event.metadata.correlation_id,
359            uri = %event.uri,
360            client_ip = %event.metadata.client_ip,
361            "Denylist agent checking request"
362        );
363
364        // Check if path is blocked
365        for blocked_path in &self.blocked_paths {
366            if event.uri.starts_with(blocked_path) {
367                debug!(
368                    correlation_id = %event.metadata.correlation_id,
369                    blocked_path = %blocked_path,
370                    uri = %event.uri,
371                    "Blocking request: path matched denylist"
372                );
373                return AgentResponse::block(403, Some("Forbidden path".to_string())).with_audit(
374                    AuditMetadata {
375                        tags: vec!["denylist".to_string(), "blocked_path".to_string()],
376                        reason_codes: vec!["PATH_BLOCKED".to_string()],
377                        ..Default::default()
378                    },
379                );
380            }
381        }
382
383        // Check if IP is blocked
384        if self.blocked_ips.contains(&event.metadata.client_ip) {
385            debug!(
386                correlation_id = %event.metadata.correlation_id,
387                client_ip = %event.metadata.client_ip,
388                "Blocking request: IP matched denylist"
389            );
390            return AgentResponse::block(403, Some("Forbidden IP".to_string())).with_audit(
391                AuditMetadata {
392                    tags: vec!["denylist".to_string(), "blocked_ip".to_string()],
393                    reason_codes: vec!["IP_BLOCKED".to_string()],
394                    ..Default::default()
395                },
396            );
397        }
398
399        trace!(
400            correlation_id = %event.metadata.correlation_id,
401            "Request allowed by denylist agent"
402        );
403        AgentResponse::default_allow()
404    }
405}
406
407// ============================================================================
408// gRPC Server Implementation
409// ============================================================================
410
411/// gRPC agent server for implementing external agents
412pub struct GrpcAgentServer {
413    /// Agent ID
414    id: String,
415    /// Request handler
416    handler: Arc<dyn AgentHandler>,
417}
418
419impl GrpcAgentServer {
420    /// Create a new gRPC agent server
421    pub fn new(id: impl Into<String>, handler: Box<dyn AgentHandler>) -> Self {
422        let id = id.into();
423        debug!(agent_id = %id, "Creating gRPC agent server");
424        Self {
425            id,
426            handler: Arc::from(handler),
427        }
428    }
429
430    /// Get the tonic service for this agent
431    pub fn into_service(self) -> AgentProcessorServer<GrpcAgentHandler> {
432        trace!(agent_id = %self.id, "Converting to tonic service");
433        AgentProcessorServer::new(GrpcAgentHandler {
434            id: self.id,
435            handler: self.handler,
436        })
437    }
438
439    /// Start the gRPC server on the given address
440    pub async fn run(self, addr: SocketAddr) -> Result<(), AgentProtocolError> {
441        info!(
442            agent_id = %self.id,
443            address = %addr,
444            "gRPC agent server listening"
445        );
446
447        tonic::transport::Server::builder()
448            .add_service(self.into_service())
449            .serve(addr)
450            .await
451            .map_err(|e| {
452                error!(error = %e, "gRPC server error");
453                AgentProtocolError::ConnectionFailed(format!("gRPC server error: {}", e))
454            })
455    }
456}
457
458/// Internal handler that implements the gRPC AgentProcessor trait
459pub struct GrpcAgentHandler {
460    id: String,
461    handler: Arc<dyn AgentHandler>,
462}
463
464#[tonic::async_trait]
465impl AgentProcessor for GrpcAgentHandler {
466    async fn process_event(
467        &self,
468        request: Request<grpc::AgentRequest>,
469    ) -> Result<Response<grpc::AgentResponse>, Status> {
470        let grpc_request = request.into_inner();
471
472        trace!(
473            agent_id = %self.id,
474            event_type = grpc_request.event_type,
475            version = grpc_request.version,
476            "Processing gRPC event"
477        );
478
479        // Convert gRPC event to internal event and dispatch
480        let response = match grpc_request.event {
481            Some(grpc::agent_request::Event::RequestHeaders(e)) => {
482                let event = Self::convert_request_headers_from_grpc(e);
483                trace!(
484                    agent_id = %self.id,
485                    correlation_id = %event.metadata.correlation_id,
486                    "Processing request_headers via gRPC"
487                );
488                self.handler.on_request_headers(event).await
489            }
490            Some(grpc::agent_request::Event::RequestBodyChunk(e)) => {
491                let event = Self::convert_request_body_chunk_from_grpc(e);
492                trace!(
493                    agent_id = %self.id,
494                    correlation_id = %event.correlation_id,
495                    "Processing request_body_chunk via gRPC"
496                );
497                self.handler.on_request_body_chunk(event).await
498            }
499            Some(grpc::agent_request::Event::ResponseHeaders(e)) => {
500                let event = Self::convert_response_headers_from_grpc(e);
501                trace!(
502                    agent_id = %self.id,
503                    correlation_id = %event.correlation_id,
504                    "Processing response_headers via gRPC"
505                );
506                self.handler.on_response_headers(event).await
507            }
508            Some(grpc::agent_request::Event::ResponseBodyChunk(e)) => {
509                let event = Self::convert_response_body_chunk_from_grpc(e);
510                trace!(
511                    agent_id = %self.id,
512                    correlation_id = %event.correlation_id,
513                    "Processing response_body_chunk via gRPC"
514                );
515                self.handler.on_response_body_chunk(event).await
516            }
517            Some(grpc::agent_request::Event::RequestComplete(e)) => {
518                let event = Self::convert_request_complete_from_grpc(e);
519                trace!(
520                    agent_id = %self.id,
521                    correlation_id = %event.correlation_id,
522                    "Processing request_complete via gRPC"
523                );
524                self.handler.on_request_complete(event).await
525            }
526            Some(grpc::agent_request::Event::WebsocketFrame(e)) => {
527                let event = Self::convert_websocket_frame_from_grpc(e);
528                trace!(
529                    agent_id = %self.id,
530                    correlation_id = %event.correlation_id,
531                    opcode = %event.opcode,
532                    "Processing websocket_frame via gRPC"
533                );
534                self.handler.on_websocket_frame(event).await
535            }
536            None => {
537                warn!(agent_id = %self.id, "Missing event in gRPC request");
538                return Err(Status::invalid_argument("Missing event in request"));
539            }
540        };
541
542        trace!(
543            agent_id = %self.id,
544            decision = ?response.decision,
545            "Returning gRPC response"
546        );
547
548        // Convert internal response to gRPC response
549        let grpc_response = Self::convert_response_to_grpc(response);
550        Ok(Response::new(grpc_response))
551    }
552
553    async fn process_event_stream(
554        &self,
555        request: Request<Streaming<grpc::AgentRequest>>,
556    ) -> Result<Response<grpc::AgentResponse>, Status> {
557        let mut stream = request.into_inner();
558
559        trace!(agent_id = %self.id, "Processing gRPC event stream");
560
561        // Process all events in the stream, returning the final response
562        let mut final_response = AgentResponse::default_allow();
563        let mut event_count = 0u32;
564
565        while let Some(result) = stream.next().await {
566            let grpc_request = result.map_err(|e| {
567                error!(agent_id = %self.id, error = %e, "Stream error");
568                Status::internal(format!("Stream error: {}", e))
569            })?;
570
571            event_count += 1;
572            trace!(
573                agent_id = %self.id,
574                event_count = event_count,
575                "Processing stream event"
576            );
577
578            let response = match grpc_request.event {
579                Some(grpc::agent_request::Event::RequestHeaders(e)) => {
580                    let event = Self::convert_request_headers_from_grpc(e);
581                    self.handler.on_request_headers(event).await
582                }
583                Some(grpc::agent_request::Event::RequestBodyChunk(e)) => {
584                    let event = Self::convert_request_body_chunk_from_grpc(e);
585                    self.handler.on_request_body_chunk(event).await
586                }
587                Some(grpc::agent_request::Event::ResponseHeaders(e)) => {
588                    let event = Self::convert_response_headers_from_grpc(e);
589                    self.handler.on_response_headers(event).await
590                }
591                Some(grpc::agent_request::Event::ResponseBodyChunk(e)) => {
592                    let event = Self::convert_response_body_chunk_from_grpc(e);
593                    self.handler.on_response_body_chunk(event).await
594                }
595                Some(grpc::agent_request::Event::RequestComplete(e)) => {
596                    let event = Self::convert_request_complete_from_grpc(e);
597                    self.handler.on_request_complete(event).await
598                }
599                Some(grpc::agent_request::Event::WebsocketFrame(e)) => {
600                    let event = Self::convert_websocket_frame_from_grpc(e);
601                    self.handler.on_websocket_frame(event).await
602                }
603                None => continue,
604            };
605
606            // If any event results in a block/redirect, that becomes the final response
607            if !matches!(response.decision, Decision::Allow) {
608                debug!(
609                    agent_id = %self.id,
610                    decision = ?response.decision,
611                    event_count = event_count,
612                    "Non-allow decision in stream, terminating early"
613                );
614                final_response = response;
615                break;
616            }
617            final_response = response;
618        }
619
620        trace!(
621            agent_id = %self.id,
622            event_count = event_count,
623            decision = ?final_response.decision,
624            "Stream processing complete"
625        );
626
627        let grpc_response = Self::convert_response_to_grpc(final_response);
628        Ok(Response::new(grpc_response))
629    }
630}
631
632impl GrpcAgentHandler {
633    /// Convert gRPC RequestHeadersEvent to internal format
634    fn convert_request_headers_from_grpc(e: grpc::RequestHeadersEvent) -> RequestHeadersEvent {
635        RequestHeadersEvent {
636            metadata: Self::convert_metadata_from_grpc(e.metadata),
637            method: e.method,
638            uri: e.uri,
639            headers: e.headers.into_iter().map(|(k, v)| (k, v.values)).collect(),
640        }
641    }
642
643    /// Convert gRPC RequestBodyChunkEvent to internal format
644    fn convert_request_body_chunk_from_grpc(
645        e: grpc::RequestBodyChunkEvent,
646    ) -> RequestBodyChunkEvent {
647        RequestBodyChunkEvent {
648            correlation_id: e.correlation_id,
649            data: String::from_utf8_lossy(&e.data).to_string(),
650            is_last: e.is_last,
651            total_size: e.total_size.map(|s| s as usize),
652            chunk_index: e.chunk_index,
653            bytes_received: e.bytes_received as usize,
654        }
655    }
656
657    /// Convert gRPC ResponseHeadersEvent to internal format
658    fn convert_response_headers_from_grpc(e: grpc::ResponseHeadersEvent) -> ResponseHeadersEvent {
659        ResponseHeadersEvent {
660            correlation_id: e.correlation_id,
661            status: e.status as u16,
662            headers: e.headers.into_iter().map(|(k, v)| (k, v.values)).collect(),
663        }
664    }
665
666    /// Convert gRPC ResponseBodyChunkEvent to internal format
667    fn convert_response_body_chunk_from_grpc(
668        e: grpc::ResponseBodyChunkEvent,
669    ) -> ResponseBodyChunkEvent {
670        ResponseBodyChunkEvent {
671            correlation_id: e.correlation_id,
672            data: String::from_utf8_lossy(&e.data).to_string(),
673            is_last: e.is_last,
674            total_size: e.total_size.map(|s| s as usize),
675            chunk_index: e.chunk_index,
676            bytes_sent: e.bytes_sent as usize,
677        }
678    }
679
680    /// Convert gRPC RequestCompleteEvent to internal format
681    fn convert_request_complete_from_grpc(e: grpc::RequestCompleteEvent) -> RequestCompleteEvent {
682        RequestCompleteEvent {
683            correlation_id: e.correlation_id,
684            status: e.status as u16,
685            duration_ms: e.duration_ms,
686            request_body_size: e.request_body_size as usize,
687            response_body_size: e.response_body_size as usize,
688            upstream_attempts: e.upstream_attempts,
689            error: e.error,
690        }
691    }
692
693    /// Convert gRPC WebSocketFrameEvent to internal format
694    fn convert_websocket_frame_from_grpc(e: grpc::WebSocketFrameEvent) -> WebSocketFrameEvent {
695        use base64::{engine::general_purpose::STANDARD, Engine as _};
696        WebSocketFrameEvent {
697            correlation_id: e.correlation_id,
698            opcode: e.opcode,
699            data: STANDARD.encode(&e.data),
700            client_to_server: e.client_to_server,
701            frame_index: e.frame_index,
702            fin: e.fin,
703            route_id: e.route_id,
704            client_ip: e.client_ip,
705        }
706    }
707
708    /// Convert gRPC metadata to internal format
709    fn convert_metadata_from_grpc(metadata: Option<grpc::RequestMetadata>) -> RequestMetadata {
710        match metadata {
711            Some(m) => RequestMetadata {
712                correlation_id: m.correlation_id,
713                request_id: m.request_id,
714                client_ip: m.client_ip,
715                client_port: m.client_port as u16,
716                server_name: m.server_name,
717                protocol: m.protocol,
718                tls_version: m.tls_version,
719                tls_cipher: m.tls_cipher,
720                route_id: m.route_id,
721                upstream_id: m.upstream_id,
722                timestamp: m.timestamp,
723            },
724            None => RequestMetadata {
725                correlation_id: String::new(),
726                request_id: String::new(),
727                client_ip: String::new(),
728                client_port: 0,
729                server_name: None,
730                protocol: String::new(),
731                tls_version: None,
732                tls_cipher: None,
733                route_id: None,
734                upstream_id: None,
735                timestamp: String::new(),
736            },
737        }
738    }
739
740    /// Convert internal response to gRPC format
741    fn convert_response_to_grpc(response: AgentResponse) -> grpc::AgentResponse {
742        let decision = match response.decision {
743            Decision::Allow => Some(grpc::agent_response::Decision::Allow(
744                grpc::AllowDecision {},
745            )),
746            Decision::Block {
747                status,
748                body,
749                headers,
750            } => Some(grpc::agent_response::Decision::Block(grpc::BlockDecision {
751                status: status as u32,
752                body,
753                headers: headers.unwrap_or_default(),
754            })),
755            Decision::Redirect { url, status } => Some(grpc::agent_response::Decision::Redirect(
756                grpc::RedirectDecision {
757                    url,
758                    status: status as u32,
759                },
760            )),
761            Decision::Challenge {
762                challenge_type,
763                params,
764            } => Some(grpc::agent_response::Decision::Challenge(
765                grpc::ChallengeDecision {
766                    challenge_type,
767                    params,
768                },
769            )),
770        };
771
772        let request_headers: Vec<grpc::HeaderOp> = response
773            .request_headers
774            .into_iter()
775            .map(Self::convert_header_op_to_grpc)
776            .collect();
777
778        let response_headers: Vec<grpc::HeaderOp> = response
779            .response_headers
780            .into_iter()
781            .map(Self::convert_header_op_to_grpc)
782            .collect();
783
784        let audit = Some(grpc::AuditMetadata {
785            tags: response.audit.tags,
786            rule_ids: response.audit.rule_ids,
787            confidence: response.audit.confidence,
788            reason_codes: response.audit.reason_codes,
789            custom: response
790                .audit
791                .custom
792                .into_iter()
793                .map(|(k, v)| (k, v.to_string()))
794                .collect(),
795        });
796
797        // Convert body mutations
798        let request_body_mutation = response.request_body_mutation.map(|m| grpc::BodyMutation {
799            data: m.data.map(|d| d.into_bytes()),
800            chunk_index: m.chunk_index,
801        });
802
803        let response_body_mutation = response.response_body_mutation.map(|m| grpc::BodyMutation {
804            data: m.data.map(|d| d.into_bytes()),
805            chunk_index: m.chunk_index,
806        });
807
808        // Convert WebSocket decision
809        let websocket_decision = response
810            .websocket_decision
811            .map(|ws_decision| match ws_decision {
812                WebSocketDecision::Allow => {
813                    grpc::agent_response::WebsocketDecision::WebsocketAllow(
814                        grpc::WebSocketAllowDecision {},
815                    )
816                }
817                WebSocketDecision::Drop => grpc::agent_response::WebsocketDecision::WebsocketDrop(
818                    grpc::WebSocketDropDecision {},
819                ),
820                WebSocketDecision::Close { code, reason } => {
821                    grpc::agent_response::WebsocketDecision::WebsocketClose(
822                        grpc::WebSocketCloseDecision {
823                            code: code as u32,
824                            reason,
825                        },
826                    )
827                }
828            });
829
830        grpc::AgentResponse {
831            version: PROTOCOL_VERSION,
832            decision,
833            request_headers,
834            response_headers,
835            routing_metadata: response.routing_metadata,
836            audit,
837            needs_more: response.needs_more,
838            request_body_mutation,
839            response_body_mutation,
840            websocket_decision,
841        }
842    }
843
844    /// Convert internal header operation to gRPC format
845    fn convert_header_op_to_grpc(op: HeaderOp) -> grpc::HeaderOp {
846        let operation = match op {
847            HeaderOp::Set { name, value } => {
848                Some(grpc::header_op::Operation::Set(grpc::SetHeader {
849                    name,
850                    value,
851                }))
852            }
853            HeaderOp::Add { name, value } => {
854                Some(grpc::header_op::Operation::Add(grpc::AddHeader {
855                    name,
856                    value,
857                }))
858            }
859            HeaderOp::Remove { name } => {
860                Some(grpc::header_op::Operation::Remove(grpc::RemoveHeader {
861                    name,
862                }))
863            }
864        };
865        grpc::HeaderOp { operation }
866    }
867}