1use async_trait::async_trait;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use tokio::io::{AsyncReadExt, AsyncWriteExt};
11use tokio::net::{UnixListener, UnixStream};
12use tokio_stream::StreamExt;
13use tonic::{Request, Response, Status, Streaming};
14use tracing::{debug, error, info, trace, warn};
15
16use crate::errors::AgentProtocolError;
17use crate::grpc::{
18 self, agent_processor_server::AgentProcessor, agent_processor_server::AgentProcessorServer,
19};
20use crate::protocol::{
21 AgentRequest, AgentResponse, AuditMetadata, ConfigureEvent, Decision, EventType,
22 GuardrailInspectEvent, HeaderOp, RequestBodyChunkEvent, RequestCompleteEvent,
23 RequestHeadersEvent, RequestMetadata, ResponseBodyChunkEvent, ResponseHeadersEvent,
24 WebSocketDecision, WebSocketFrameEvent, MAX_MESSAGE_SIZE, PROTOCOL_VERSION,
25};
26
27pub struct AgentServer {
29 id: String,
31 socket_path: std::path::PathBuf,
33 handler: Arc<dyn AgentHandler>,
35}
36
37#[async_trait]
39pub trait AgentHandler: Send + Sync {
40 async fn on_configure(&self, _event: ConfigureEvent) -> AgentResponse {
48 AgentResponse::default_allow()
49 }
50
51 async fn on_request_headers(&self, _event: RequestHeadersEvent) -> AgentResponse {
53 AgentResponse::default_allow()
54 }
55
56 async fn on_request_body_chunk(&self, _event: RequestBodyChunkEvent) -> AgentResponse {
58 AgentResponse::default_allow()
59 }
60
61 async fn on_response_headers(&self, _event: ResponseHeadersEvent) -> AgentResponse {
63 AgentResponse::default_allow()
64 }
65
66 async fn on_response_body_chunk(&self, _event: ResponseBodyChunkEvent) -> AgentResponse {
68 AgentResponse::default_allow()
69 }
70
71 async fn on_request_complete(&self, _event: RequestCompleteEvent) -> AgentResponse {
73 AgentResponse::default_allow()
74 }
75
76 async fn on_websocket_frame(&self, _event: WebSocketFrameEvent) -> AgentResponse {
83 AgentResponse::websocket_allow()
84 }
85
86 async fn on_guardrail_inspect(&self, _event: GuardrailInspectEvent) -> AgentResponse {
93 AgentResponse::default_allow()
94 }
95}
96
97impl AgentServer {
98 pub fn new(
100 id: impl Into<String>,
101 socket_path: impl Into<std::path::PathBuf>,
102 handler: Box<dyn AgentHandler>,
103 ) -> Self {
104 let id = id.into();
105 let socket_path = socket_path.into();
106
107 debug!(
108 agent_id = %id,
109 socket_path = %socket_path.display(),
110 "Creating agent server"
111 );
112
113 Self {
114 id,
115 socket_path,
116 handler: Arc::from(handler),
117 }
118 }
119
120 pub async fn run(&self) -> Result<(), AgentProtocolError> {
122 if self.socket_path.exists() {
124 trace!(
125 agent_id = %self.id,
126 socket_path = %self.socket_path.display(),
127 "Removing existing socket file"
128 );
129 std::fs::remove_file(&self.socket_path)?;
130 }
131
132 let listener = UnixListener::bind(&self.socket_path)?;
134
135 info!(
136 agent_id = %self.id,
137 socket_path = %self.socket_path.display(),
138 "Agent server listening"
139 );
140
141 loop {
142 match listener.accept().await {
143 Ok((stream, _addr)) => {
144 trace!(
145 agent_id = %self.id,
146 "Accepted new connection"
147 );
148 let handler = Arc::clone(&self.handler);
149 let agent_id = self.id.clone();
150 tokio::spawn(async move {
151 if let Err(e) = Self::handle_connection(stream, handler.as_ref()).await {
152 error!(
153 agent_id = %agent_id,
154 error = %e,
155 "Error handling agent connection"
156 );
157 }
158 });
159 }
160 Err(e) => {
161 error!(
162 agent_id = %self.id,
163 error = %e,
164 "Failed to accept connection"
165 );
166 }
167 }
168 }
169 }
170
171 async fn handle_connection(
173 mut stream: UnixStream,
174 handler: &dyn AgentHandler,
175 ) -> Result<(), AgentProtocolError> {
176 trace!("Starting connection handler");
177
178 loop {
179 let mut len_bytes = [0u8; 4];
181 match stream.read_exact(&mut len_bytes).await {
182 Ok(_) => {}
183 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
184 trace!("Client disconnected (EOF)");
186 return Ok(());
187 }
188 Err(e) => {
189 error!(error = %e, "Error reading message length");
190 return Err(e.into());
191 }
192 }
193
194 let message_len = u32::from_be_bytes(len_bytes) as usize;
195
196 if message_len > MAX_MESSAGE_SIZE {
198 warn!(
199 message_len = message_len,
200 max_size = MAX_MESSAGE_SIZE,
201 "Message too large"
202 );
203 return Err(AgentProtocolError::MessageTooLarge {
204 size: message_len,
205 max: MAX_MESSAGE_SIZE,
206 });
207 }
208
209 trace!(message_len = message_len, "Reading message data");
210
211 let mut buffer = vec![0u8; message_len];
213 stream.read_exact(&mut buffer).await?;
214
215 let request: AgentRequest = serde_json::from_slice(&buffer)
217 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
218
219 trace!(
220 event_type = ?request.event_type,
221 version = request.version,
222 "Received agent request"
223 );
224
225 let response = match request.event_type {
227 EventType::Configure => {
228 let event: ConfigureEvent = serde_json::from_value(request.payload)
229 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
230 trace!(
231 agent_id = %event.agent_id,
232 "Processing configure event"
233 );
234 handler.on_configure(event).await
235 }
236 EventType::RequestHeaders => {
237 let event: RequestHeadersEvent = serde_json::from_value(request.payload)
238 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
239 trace!(
240 correlation_id = %event.metadata.correlation_id,
241 method = %event.method,
242 uri = %event.uri,
243 "Processing request_headers event"
244 );
245 handler.on_request_headers(event).await
246 }
247 EventType::RequestBodyChunk => {
248 let event: RequestBodyChunkEvent = serde_json::from_value(request.payload)
249 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
250 trace!(
251 correlation_id = %event.correlation_id,
252 is_last = event.is_last,
253 data_len = event.data.len(),
254 "Processing request_body_chunk event"
255 );
256 handler.on_request_body_chunk(event).await
257 }
258 EventType::ResponseHeaders => {
259 let event: ResponseHeadersEvent = serde_json::from_value(request.payload)
260 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
261 trace!(
262 correlation_id = %event.correlation_id,
263 status = event.status,
264 "Processing response_headers event"
265 );
266 handler.on_response_headers(event).await
267 }
268 EventType::ResponseBodyChunk => {
269 let event: ResponseBodyChunkEvent = serde_json::from_value(request.payload)
270 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
271 trace!(
272 correlation_id = %event.correlation_id,
273 is_last = event.is_last,
274 data_len = event.data.len(),
275 "Processing response_body_chunk event"
276 );
277 handler.on_response_body_chunk(event).await
278 }
279 EventType::RequestComplete => {
280 let event: RequestCompleteEvent = serde_json::from_value(request.payload)
281 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
282 trace!(
283 correlation_id = %event.correlation_id,
284 status = event.status,
285 duration_ms = event.duration_ms,
286 "Processing request_complete event"
287 );
288 handler.on_request_complete(event).await
289 }
290 EventType::WebSocketFrame => {
291 let event: WebSocketFrameEvent = serde_json::from_value(request.payload)
292 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
293 trace!(
294 correlation_id = %event.correlation_id,
295 opcode = %event.opcode,
296 frame_index = event.frame_index,
297 client_to_server = event.client_to_server,
298 "Processing websocket_frame event"
299 );
300 handler.on_websocket_frame(event).await
301 }
302 EventType::GuardrailInspect => {
303 let event: GuardrailInspectEvent = serde_json::from_value(request.payload)
304 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
305 trace!(
306 correlation_id = %event.correlation_id,
307 inspection_type = ?event.inspection_type,
308 content_len = event.content.len(),
309 "Processing guardrail_inspect event"
310 );
311 handler.on_guardrail_inspect(event).await
312 }
313 };
314
315 trace!(
316 decision = ?response.decision,
317 "Sending agent response"
318 );
319
320 let response_bytes = serde_json::to_vec(&response)
322 .map_err(|e| AgentProtocolError::Serialization(e.to_string()))?;
323
324 let len_bytes = (response_bytes.len() as u32).to_be_bytes();
326 stream.write_all(&len_bytes).await?;
327 stream.write_all(&response_bytes).await?;
329 stream.flush().await?;
330
331 trace!(response_len = response_bytes.len(), "Response sent");
332 }
333 }
334}
335
336pub struct EchoAgent;
338
339#[async_trait]
340impl AgentHandler for EchoAgent {
341 async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
342 debug!(
343 "Echo agent: request headers for {}",
344 event.metadata.correlation_id
345 );
346
347 AgentResponse::default_allow()
349 .add_request_header(HeaderOp::Set {
350 name: "X-Echo-Agent".to_string(),
351 value: event.metadata.correlation_id.clone(),
352 })
353 .with_audit(AuditMetadata {
354 tags: vec!["echo".to_string()],
355 ..Default::default()
356 })
357 }
358}
359
360pub struct DenylistAgent {
362 blocked_paths: Vec<String>,
363 blocked_ips: Vec<String>,
364}
365
366impl DenylistAgent {
367 pub fn new(blocked_paths: Vec<String>, blocked_ips: Vec<String>) -> Self {
368 Self {
369 blocked_paths,
370 blocked_ips,
371 }
372 }
373}
374
375#[async_trait]
376impl AgentHandler for DenylistAgent {
377 async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
378 trace!(
379 correlation_id = %event.metadata.correlation_id,
380 uri = %event.uri,
381 client_ip = %event.metadata.client_ip,
382 "Denylist agent checking request"
383 );
384
385 for blocked_path in &self.blocked_paths {
387 if event.uri.starts_with(blocked_path) {
388 debug!(
389 correlation_id = %event.metadata.correlation_id,
390 blocked_path = %blocked_path,
391 uri = %event.uri,
392 "Blocking request: path matched denylist"
393 );
394 return AgentResponse::block(403, Some("Forbidden path".to_string())).with_audit(
395 AuditMetadata {
396 tags: vec!["denylist".to_string(), "blocked_path".to_string()],
397 reason_codes: vec!["PATH_BLOCKED".to_string()],
398 ..Default::default()
399 },
400 );
401 }
402 }
403
404 if self.blocked_ips.contains(&event.metadata.client_ip) {
406 debug!(
407 correlation_id = %event.metadata.correlation_id,
408 client_ip = %event.metadata.client_ip,
409 "Blocking request: IP matched denylist"
410 );
411 return AgentResponse::block(403, Some("Forbidden IP".to_string())).with_audit(
412 AuditMetadata {
413 tags: vec!["denylist".to_string(), "blocked_ip".to_string()],
414 reason_codes: vec!["IP_BLOCKED".to_string()],
415 ..Default::default()
416 },
417 );
418 }
419
420 trace!(
421 correlation_id = %event.metadata.correlation_id,
422 "Request allowed by denylist agent"
423 );
424 AgentResponse::default_allow()
425 }
426}
427
428pub struct GrpcAgentServer {
434 id: String,
436 handler: Arc<dyn AgentHandler>,
438}
439
440impl GrpcAgentServer {
441 pub fn new(id: impl Into<String>, handler: Box<dyn AgentHandler>) -> Self {
443 let id = id.into();
444 debug!(agent_id = %id, "Creating gRPC agent server");
445 Self {
446 id,
447 handler: Arc::from(handler),
448 }
449 }
450
451 pub fn into_service(self) -> AgentProcessorServer<GrpcAgentHandler> {
453 trace!(agent_id = %self.id, "Converting to tonic service");
454 AgentProcessorServer::new(GrpcAgentHandler {
455 id: self.id,
456 handler: self.handler,
457 })
458 }
459
460 pub async fn run(self, addr: SocketAddr) -> Result<(), AgentProtocolError> {
462 info!(
463 agent_id = %self.id,
464 address = %addr,
465 "gRPC agent server listening"
466 );
467
468 tonic::transport::Server::builder()
469 .add_service(self.into_service())
470 .serve(addr)
471 .await
472 .map_err(|e| {
473 error!(error = %e, "gRPC server error");
474 AgentProtocolError::ConnectionFailed(format!("gRPC server error: {}", e))
475 })
476 }
477}
478
479pub struct GrpcAgentHandler {
481 id: String,
482 handler: Arc<dyn AgentHandler>,
483}
484
485#[tonic::async_trait]
486impl AgentProcessor for GrpcAgentHandler {
487 async fn process_event(
488 &self,
489 request: Request<grpc::AgentRequest>,
490 ) -> Result<Response<grpc::AgentResponse>, Status> {
491 let grpc_request = request.into_inner();
492
493 trace!(
494 agent_id = %self.id,
495 event_type = grpc_request.event_type,
496 version = grpc_request.version,
497 "Processing gRPC event"
498 );
499
500 let response = match grpc_request.event {
502 Some(grpc::agent_request::Event::RequestHeaders(e)) => {
503 let event = Self::convert_request_headers_from_grpc(e);
504 trace!(
505 agent_id = %self.id,
506 correlation_id = %event.metadata.correlation_id,
507 "Processing request_headers via gRPC"
508 );
509 self.handler.on_request_headers(event).await
510 }
511 Some(grpc::agent_request::Event::RequestBodyChunk(e)) => {
512 let event = Self::convert_request_body_chunk_from_grpc(e);
513 trace!(
514 agent_id = %self.id,
515 correlation_id = %event.correlation_id,
516 "Processing request_body_chunk via gRPC"
517 );
518 self.handler.on_request_body_chunk(event).await
519 }
520 Some(grpc::agent_request::Event::ResponseHeaders(e)) => {
521 let event = Self::convert_response_headers_from_grpc(e);
522 trace!(
523 agent_id = %self.id,
524 correlation_id = %event.correlation_id,
525 "Processing response_headers via gRPC"
526 );
527 self.handler.on_response_headers(event).await
528 }
529 Some(grpc::agent_request::Event::ResponseBodyChunk(e)) => {
530 let event = Self::convert_response_body_chunk_from_grpc(e);
531 trace!(
532 agent_id = %self.id,
533 correlation_id = %event.correlation_id,
534 "Processing response_body_chunk via gRPC"
535 );
536 self.handler.on_response_body_chunk(event).await
537 }
538 Some(grpc::agent_request::Event::RequestComplete(e)) => {
539 let event = Self::convert_request_complete_from_grpc(e);
540 trace!(
541 agent_id = %self.id,
542 correlation_id = %event.correlation_id,
543 "Processing request_complete via gRPC"
544 );
545 self.handler.on_request_complete(event).await
546 }
547 Some(grpc::agent_request::Event::WebsocketFrame(e)) => {
548 let event = Self::convert_websocket_frame_from_grpc(e);
549 trace!(
550 agent_id = %self.id,
551 correlation_id = %event.correlation_id,
552 opcode = %event.opcode,
553 "Processing websocket_frame via gRPC"
554 );
555 self.handler.on_websocket_frame(event).await
556 }
557 None => {
558 warn!(agent_id = %self.id, "Missing event in gRPC request");
559 return Err(Status::invalid_argument("Missing event in request"));
560 }
561 };
562
563 trace!(
564 agent_id = %self.id,
565 decision = ?response.decision,
566 "Returning gRPC response"
567 );
568
569 let grpc_response = Self::convert_response_to_grpc(response);
571 Ok(Response::new(grpc_response))
572 }
573
574 async fn process_event_stream(
575 &self,
576 request: Request<Streaming<grpc::AgentRequest>>,
577 ) -> Result<Response<grpc::AgentResponse>, Status> {
578 let mut stream = request.into_inner();
579
580 trace!(agent_id = %self.id, "Processing gRPC event stream");
581
582 let mut final_response = AgentResponse::default_allow();
584 let mut event_count = 0u32;
585
586 while let Some(result) = stream.next().await {
587 let grpc_request = result.map_err(|e| {
588 error!(agent_id = %self.id, error = %e, "Stream error");
589 Status::internal(format!("Stream error: {}", e))
590 })?;
591
592 event_count += 1;
593 trace!(
594 agent_id = %self.id,
595 event_count = event_count,
596 "Processing stream event"
597 );
598
599 let response = match grpc_request.event {
600 Some(grpc::agent_request::Event::RequestHeaders(e)) => {
601 let event = Self::convert_request_headers_from_grpc(e);
602 self.handler.on_request_headers(event).await
603 }
604 Some(grpc::agent_request::Event::RequestBodyChunk(e)) => {
605 let event = Self::convert_request_body_chunk_from_grpc(e);
606 self.handler.on_request_body_chunk(event).await
607 }
608 Some(grpc::agent_request::Event::ResponseHeaders(e)) => {
609 let event = Self::convert_response_headers_from_grpc(e);
610 self.handler.on_response_headers(event).await
611 }
612 Some(grpc::agent_request::Event::ResponseBodyChunk(e)) => {
613 let event = Self::convert_response_body_chunk_from_grpc(e);
614 self.handler.on_response_body_chunk(event).await
615 }
616 Some(grpc::agent_request::Event::RequestComplete(e)) => {
617 let event = Self::convert_request_complete_from_grpc(e);
618 self.handler.on_request_complete(event).await
619 }
620 Some(grpc::agent_request::Event::WebsocketFrame(e)) => {
621 let event = Self::convert_websocket_frame_from_grpc(e);
622 self.handler.on_websocket_frame(event).await
623 }
624 None => continue,
625 };
626
627 if !matches!(response.decision, Decision::Allow) {
629 debug!(
630 agent_id = %self.id,
631 decision = ?response.decision,
632 event_count = event_count,
633 "Non-allow decision in stream, terminating early"
634 );
635 final_response = response;
636 break;
637 }
638 final_response = response;
639 }
640
641 trace!(
642 agent_id = %self.id,
643 event_count = event_count,
644 decision = ?final_response.decision,
645 "Stream processing complete"
646 );
647
648 let grpc_response = Self::convert_response_to_grpc(final_response);
649 Ok(Response::new(grpc_response))
650 }
651}
652
653impl GrpcAgentHandler {
654 fn convert_request_headers_from_grpc(e: grpc::RequestHeadersEvent) -> RequestHeadersEvent {
656 RequestHeadersEvent {
657 metadata: Self::convert_metadata_from_grpc(e.metadata),
658 method: e.method,
659 uri: e.uri,
660 headers: e.headers.into_iter().map(|(k, v)| (k, v.values)).collect(),
661 }
662 }
663
664 fn convert_request_body_chunk_from_grpc(
666 e: grpc::RequestBodyChunkEvent,
667 ) -> RequestBodyChunkEvent {
668 RequestBodyChunkEvent {
669 correlation_id: e.correlation_id,
670 data: String::from_utf8_lossy(&e.data).to_string(),
671 is_last: e.is_last,
672 total_size: e.total_size.map(|s| s as usize),
673 chunk_index: e.chunk_index,
674 bytes_received: e.bytes_received as usize,
675 }
676 }
677
678 fn convert_response_headers_from_grpc(e: grpc::ResponseHeadersEvent) -> ResponseHeadersEvent {
680 ResponseHeadersEvent {
681 correlation_id: e.correlation_id,
682 status: e.status as u16,
683 headers: e.headers.into_iter().map(|(k, v)| (k, v.values)).collect(),
684 }
685 }
686
687 fn convert_response_body_chunk_from_grpc(
689 e: grpc::ResponseBodyChunkEvent,
690 ) -> ResponseBodyChunkEvent {
691 ResponseBodyChunkEvent {
692 correlation_id: e.correlation_id,
693 data: String::from_utf8_lossy(&e.data).to_string(),
694 is_last: e.is_last,
695 total_size: e.total_size.map(|s| s as usize),
696 chunk_index: e.chunk_index,
697 bytes_sent: e.bytes_sent as usize,
698 }
699 }
700
701 fn convert_request_complete_from_grpc(e: grpc::RequestCompleteEvent) -> RequestCompleteEvent {
703 RequestCompleteEvent {
704 correlation_id: e.correlation_id,
705 status: e.status as u16,
706 duration_ms: e.duration_ms,
707 request_body_size: e.request_body_size as usize,
708 response_body_size: e.response_body_size as usize,
709 upstream_attempts: e.upstream_attempts,
710 error: e.error,
711 }
712 }
713
714 fn convert_websocket_frame_from_grpc(e: grpc::WebSocketFrameEvent) -> WebSocketFrameEvent {
716 use base64::{engine::general_purpose::STANDARD, Engine as _};
717 WebSocketFrameEvent {
718 correlation_id: e.correlation_id,
719 opcode: e.opcode,
720 data: STANDARD.encode(&e.data),
721 client_to_server: e.client_to_server,
722 frame_index: e.frame_index,
723 fin: e.fin,
724 route_id: e.route_id,
725 client_ip: e.client_ip,
726 }
727 }
728
729 fn convert_metadata_from_grpc(metadata: Option<grpc::RequestMetadata>) -> RequestMetadata {
731 match metadata {
732 Some(m) => RequestMetadata {
733 correlation_id: m.correlation_id,
734 request_id: m.request_id,
735 client_ip: m.client_ip,
736 client_port: m.client_port as u16,
737 server_name: m.server_name,
738 protocol: m.protocol,
739 tls_version: m.tls_version,
740 tls_cipher: m.tls_cipher,
741 route_id: m.route_id,
742 upstream_id: m.upstream_id,
743 timestamp: m.timestamp,
744 traceparent: m.traceparent,
745 },
746 None => RequestMetadata {
747 correlation_id: String::new(),
748 request_id: String::new(),
749 client_ip: String::new(),
750 client_port: 0,
751 server_name: None,
752 protocol: String::new(),
753 tls_version: None,
754 tls_cipher: None,
755 route_id: None,
756 upstream_id: None,
757 timestamp: String::new(),
758 traceparent: None,
759 },
760 }
761 }
762
763 fn convert_response_to_grpc(response: AgentResponse) -> grpc::AgentResponse {
765 let decision = match response.decision {
766 Decision::Allow => Some(grpc::agent_response::Decision::Allow(
767 grpc::AllowDecision {},
768 )),
769 Decision::Block {
770 status,
771 body,
772 headers,
773 } => Some(grpc::agent_response::Decision::Block(grpc::BlockDecision {
774 status: status as u32,
775 body,
776 headers: headers.unwrap_or_default(),
777 })),
778 Decision::Redirect { url, status } => Some(grpc::agent_response::Decision::Redirect(
779 grpc::RedirectDecision {
780 url,
781 status: status as u32,
782 },
783 )),
784 Decision::Challenge {
785 challenge_type,
786 params,
787 } => Some(grpc::agent_response::Decision::Challenge(
788 grpc::ChallengeDecision {
789 challenge_type,
790 params,
791 },
792 )),
793 };
794
795 let request_headers: Vec<grpc::HeaderOp> = response
796 .request_headers
797 .into_iter()
798 .map(Self::convert_header_op_to_grpc)
799 .collect();
800
801 let response_headers: Vec<grpc::HeaderOp> = response
802 .response_headers
803 .into_iter()
804 .map(Self::convert_header_op_to_grpc)
805 .collect();
806
807 let audit = Some(grpc::AuditMetadata {
808 tags: response.audit.tags,
809 rule_ids: response.audit.rule_ids,
810 confidence: response.audit.confidence,
811 reason_codes: response.audit.reason_codes,
812 custom: response
813 .audit
814 .custom
815 .into_iter()
816 .map(|(k, v)| (k, v.to_string()))
817 .collect(),
818 });
819
820 let request_body_mutation = response.request_body_mutation.map(|m| grpc::BodyMutation {
822 data: m.data.map(|d| d.into_bytes()),
823 chunk_index: m.chunk_index,
824 });
825
826 let response_body_mutation = response.response_body_mutation.map(|m| grpc::BodyMutation {
827 data: m.data.map(|d| d.into_bytes()),
828 chunk_index: m.chunk_index,
829 });
830
831 let websocket_decision = response
833 .websocket_decision
834 .map(|ws_decision| match ws_decision {
835 WebSocketDecision::Allow => {
836 grpc::agent_response::WebsocketDecision::WebsocketAllow(
837 grpc::WebSocketAllowDecision {},
838 )
839 }
840 WebSocketDecision::Drop => grpc::agent_response::WebsocketDecision::WebsocketDrop(
841 grpc::WebSocketDropDecision {},
842 ),
843 WebSocketDecision::Close { code, reason } => {
844 grpc::agent_response::WebsocketDecision::WebsocketClose(
845 grpc::WebSocketCloseDecision {
846 code: code as u32,
847 reason,
848 },
849 )
850 }
851 });
852
853 grpc::AgentResponse {
854 version: PROTOCOL_VERSION,
855 decision,
856 request_headers,
857 response_headers,
858 routing_metadata: response.routing_metadata,
859 audit,
860 needs_more: response.needs_more,
861 request_body_mutation,
862 response_body_mutation,
863 websocket_decision,
864 }
865 }
866
867 fn convert_header_op_to_grpc(op: HeaderOp) -> grpc::HeaderOp {
869 let operation = match op {
870 HeaderOp::Set { name, value } => {
871 Some(grpc::header_op::Operation::Set(grpc::SetHeader {
872 name,
873 value,
874 }))
875 }
876 HeaderOp::Add { name, value } => {
877 Some(grpc::header_op::Operation::Add(grpc::AddHeader {
878 name,
879 value,
880 }))
881 }
882 HeaderOp::Remove { name } => {
883 Some(grpc::header_op::Operation::Remove(grpc::RemoveHeader {
884 name,
885 }))
886 }
887 };
888 grpc::HeaderOp { operation }
889 }
890}