1use async_trait::async_trait;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use tokio::io::{AsyncReadExt, AsyncWriteExt};
11use tokio::net::{UnixListener, UnixStream};
12use tokio_stream::StreamExt;
13use tonic::{Request, Response, Status, Streaming};
14use tracing::{debug, error, info, trace, warn};
15
16use crate::errors::AgentProtocolError;
17use crate::grpc::{
18 self, agent_processor_server::AgentProcessor, agent_processor_server::AgentProcessorServer,
19};
20use crate::protocol::{
21 AgentRequest, AgentResponse, AuditMetadata, Decision, EventType, HeaderOp,
22 RequestBodyChunkEvent, RequestCompleteEvent, RequestHeadersEvent, RequestMetadata,
23 ResponseBodyChunkEvent, ResponseHeadersEvent, WebSocketDecision, WebSocketFrameEvent,
24 MAX_MESSAGE_SIZE, PROTOCOL_VERSION,
25};
26
27pub struct AgentServer {
29 id: String,
31 socket_path: std::path::PathBuf,
33 handler: Arc<dyn AgentHandler>,
35}
36
37#[async_trait]
39pub trait AgentHandler: Send + Sync {
40 async fn on_request_headers(&self, _event: RequestHeadersEvent) -> AgentResponse {
42 AgentResponse::default_allow()
43 }
44
45 async fn on_request_body_chunk(&self, _event: RequestBodyChunkEvent) -> AgentResponse {
47 AgentResponse::default_allow()
48 }
49
50 async fn on_response_headers(&self, _event: ResponseHeadersEvent) -> AgentResponse {
52 AgentResponse::default_allow()
53 }
54
55 async fn on_response_body_chunk(&self, _event: ResponseBodyChunkEvent) -> AgentResponse {
57 AgentResponse::default_allow()
58 }
59
60 async fn on_request_complete(&self, _event: RequestCompleteEvent) -> AgentResponse {
62 AgentResponse::default_allow()
63 }
64
65 async fn on_websocket_frame(&self, _event: WebSocketFrameEvent) -> AgentResponse {
72 AgentResponse::websocket_allow()
73 }
74}
75
76impl AgentServer {
77 pub fn new(
79 id: impl Into<String>,
80 socket_path: impl Into<std::path::PathBuf>,
81 handler: Box<dyn AgentHandler>,
82 ) -> Self {
83 let id = id.into();
84 let socket_path = socket_path.into();
85
86 debug!(
87 agent_id = %id,
88 socket_path = %socket_path.display(),
89 "Creating agent server"
90 );
91
92 Self {
93 id,
94 socket_path,
95 handler: Arc::from(handler),
96 }
97 }
98
99 pub async fn run(&self) -> Result<(), AgentProtocolError> {
101 if self.socket_path.exists() {
103 trace!(
104 agent_id = %self.id,
105 socket_path = %self.socket_path.display(),
106 "Removing existing socket file"
107 );
108 std::fs::remove_file(&self.socket_path)?;
109 }
110
111 let listener = UnixListener::bind(&self.socket_path)?;
113
114 info!(
115 agent_id = %self.id,
116 socket_path = %self.socket_path.display(),
117 "Agent server listening"
118 );
119
120 loop {
121 match listener.accept().await {
122 Ok((stream, _addr)) => {
123 trace!(
124 agent_id = %self.id,
125 "Accepted new connection"
126 );
127 let handler = Arc::clone(&self.handler);
128 let agent_id = self.id.clone();
129 tokio::spawn(async move {
130 if let Err(e) = Self::handle_connection(stream, handler.as_ref()).await {
131 error!(
132 agent_id = %agent_id,
133 error = %e,
134 "Error handling agent connection"
135 );
136 }
137 });
138 }
139 Err(e) => {
140 error!(
141 agent_id = %self.id,
142 error = %e,
143 "Failed to accept connection"
144 );
145 }
146 }
147 }
148 }
149
150 async fn handle_connection(
152 mut stream: UnixStream,
153 handler: &dyn AgentHandler,
154 ) -> Result<(), AgentProtocolError> {
155 trace!("Starting connection handler");
156
157 loop {
158 let mut len_bytes = [0u8; 4];
160 match stream.read_exact(&mut len_bytes).await {
161 Ok(_) => {}
162 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
163 trace!("Client disconnected (EOF)");
165 return Ok(());
166 }
167 Err(e) => {
168 error!(error = %e, "Error reading message length");
169 return Err(e.into());
170 }
171 }
172
173 let message_len = u32::from_be_bytes(len_bytes) as usize;
174
175 if message_len > MAX_MESSAGE_SIZE {
177 warn!(
178 message_len = message_len,
179 max_size = MAX_MESSAGE_SIZE,
180 "Message too large"
181 );
182 return Err(AgentProtocolError::MessageTooLarge {
183 size: message_len,
184 max: MAX_MESSAGE_SIZE,
185 });
186 }
187
188 trace!(message_len = message_len, "Reading message data");
189
190 let mut buffer = vec![0u8; message_len];
192 stream.read_exact(&mut buffer).await?;
193
194 let request: AgentRequest = serde_json::from_slice(&buffer)
196 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
197
198 trace!(
199 event_type = ?request.event_type,
200 version = request.version,
201 "Received agent request"
202 );
203
204 let response = match request.event_type {
206 EventType::RequestHeaders => {
207 let event: RequestHeadersEvent = serde_json::from_value(request.payload)
208 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
209 trace!(
210 correlation_id = %event.metadata.correlation_id,
211 method = %event.method,
212 uri = %event.uri,
213 "Processing request_headers event"
214 );
215 handler.on_request_headers(event).await
216 }
217 EventType::RequestBodyChunk => {
218 let event: RequestBodyChunkEvent = serde_json::from_value(request.payload)
219 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
220 trace!(
221 correlation_id = %event.correlation_id,
222 is_last = event.is_last,
223 data_len = event.data.len(),
224 "Processing request_body_chunk event"
225 );
226 handler.on_request_body_chunk(event).await
227 }
228 EventType::ResponseHeaders => {
229 let event: ResponseHeadersEvent = serde_json::from_value(request.payload)
230 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
231 trace!(
232 correlation_id = %event.correlation_id,
233 status = event.status,
234 "Processing response_headers event"
235 );
236 handler.on_response_headers(event).await
237 }
238 EventType::ResponseBodyChunk => {
239 let event: ResponseBodyChunkEvent = serde_json::from_value(request.payload)
240 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
241 trace!(
242 correlation_id = %event.correlation_id,
243 is_last = event.is_last,
244 data_len = event.data.len(),
245 "Processing response_body_chunk event"
246 );
247 handler.on_response_body_chunk(event).await
248 }
249 EventType::RequestComplete => {
250 let event: RequestCompleteEvent = serde_json::from_value(request.payload)
251 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
252 trace!(
253 correlation_id = %event.correlation_id,
254 status = event.status,
255 duration_ms = event.duration_ms,
256 "Processing request_complete event"
257 );
258 handler.on_request_complete(event).await
259 }
260 EventType::WebSocketFrame => {
261 let event: WebSocketFrameEvent = serde_json::from_value(request.payload)
262 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
263 trace!(
264 correlation_id = %event.correlation_id,
265 opcode = %event.opcode,
266 frame_index = event.frame_index,
267 client_to_server = event.client_to_server,
268 "Processing websocket_frame event"
269 );
270 handler.on_websocket_frame(event).await
271 }
272 };
273
274 trace!(
275 decision = ?response.decision,
276 "Sending agent response"
277 );
278
279 let response_bytes = serde_json::to_vec(&response)
281 .map_err(|e| AgentProtocolError::Serialization(e.to_string()))?;
282
283 let len_bytes = (response_bytes.len() as u32).to_be_bytes();
285 stream.write_all(&len_bytes).await?;
286 stream.write_all(&response_bytes).await?;
288 stream.flush().await?;
289
290 trace!(response_len = response_bytes.len(), "Response sent");
291 }
292 }
293}
294
295pub struct EchoAgent;
297
298#[async_trait]
299impl AgentHandler for EchoAgent {
300 async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
301 debug!(
302 "Echo agent: request headers for {}",
303 event.metadata.correlation_id
304 );
305
306 AgentResponse::default_allow()
308 .add_request_header(HeaderOp::Set {
309 name: "X-Echo-Agent".to_string(),
310 value: event.metadata.correlation_id.clone(),
311 })
312 .with_audit(AuditMetadata {
313 tags: vec!["echo".to_string()],
314 ..Default::default()
315 })
316 }
317}
318
319pub struct DenylistAgent {
321 blocked_paths: Vec<String>,
322 blocked_ips: Vec<String>,
323}
324
325impl DenylistAgent {
326 pub fn new(blocked_paths: Vec<String>, blocked_ips: Vec<String>) -> Self {
327 Self {
328 blocked_paths,
329 blocked_ips,
330 }
331 }
332}
333
334#[async_trait]
335impl AgentHandler for DenylistAgent {
336 async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
337 trace!(
338 correlation_id = %event.metadata.correlation_id,
339 uri = %event.uri,
340 client_ip = %event.metadata.client_ip,
341 "Denylist agent checking request"
342 );
343
344 for blocked_path in &self.blocked_paths {
346 if event.uri.starts_with(blocked_path) {
347 debug!(
348 correlation_id = %event.metadata.correlation_id,
349 blocked_path = %blocked_path,
350 uri = %event.uri,
351 "Blocking request: path matched denylist"
352 );
353 return AgentResponse::block(403, Some("Forbidden path".to_string())).with_audit(
354 AuditMetadata {
355 tags: vec!["denylist".to_string(), "blocked_path".to_string()],
356 reason_codes: vec!["PATH_BLOCKED".to_string()],
357 ..Default::default()
358 },
359 );
360 }
361 }
362
363 if self.blocked_ips.contains(&event.metadata.client_ip) {
365 debug!(
366 correlation_id = %event.metadata.correlation_id,
367 client_ip = %event.metadata.client_ip,
368 "Blocking request: IP matched denylist"
369 );
370 return AgentResponse::block(403, Some("Forbidden IP".to_string())).with_audit(
371 AuditMetadata {
372 tags: vec!["denylist".to_string(), "blocked_ip".to_string()],
373 reason_codes: vec!["IP_BLOCKED".to_string()],
374 ..Default::default()
375 },
376 );
377 }
378
379 trace!(
380 correlation_id = %event.metadata.correlation_id,
381 "Request allowed by denylist agent"
382 );
383 AgentResponse::default_allow()
384 }
385}
386
387pub struct GrpcAgentServer {
393 id: String,
395 handler: Arc<dyn AgentHandler>,
397}
398
399impl GrpcAgentServer {
400 pub fn new(id: impl Into<String>, handler: Box<dyn AgentHandler>) -> Self {
402 let id = id.into();
403 debug!(agent_id = %id, "Creating gRPC agent server");
404 Self {
405 id,
406 handler: Arc::from(handler),
407 }
408 }
409
410 pub fn into_service(self) -> AgentProcessorServer<GrpcAgentHandler> {
412 trace!(agent_id = %self.id, "Converting to tonic service");
413 AgentProcessorServer::new(GrpcAgentHandler {
414 id: self.id,
415 handler: self.handler,
416 })
417 }
418
419 pub async fn run(self, addr: SocketAddr) -> Result<(), AgentProtocolError> {
421 info!(
422 agent_id = %self.id,
423 address = %addr,
424 "gRPC agent server listening"
425 );
426
427 tonic::transport::Server::builder()
428 .add_service(self.into_service())
429 .serve(addr)
430 .await
431 .map_err(|e| {
432 error!(error = %e, "gRPC server error");
433 AgentProtocolError::ConnectionFailed(format!("gRPC server error: {}", e))
434 })
435 }
436}
437
438pub struct GrpcAgentHandler {
440 id: String,
441 handler: Arc<dyn AgentHandler>,
442}
443
444#[tonic::async_trait]
445impl AgentProcessor for GrpcAgentHandler {
446 async fn process_event(
447 &self,
448 request: Request<grpc::AgentRequest>,
449 ) -> Result<Response<grpc::AgentResponse>, Status> {
450 let grpc_request = request.into_inner();
451
452 trace!(
453 agent_id = %self.id,
454 event_type = grpc_request.event_type,
455 version = grpc_request.version,
456 "Processing gRPC event"
457 );
458
459 let response = match grpc_request.event {
461 Some(grpc::agent_request::Event::RequestHeaders(e)) => {
462 let event = Self::convert_request_headers_from_grpc(e);
463 trace!(
464 agent_id = %self.id,
465 correlation_id = %event.metadata.correlation_id,
466 "Processing request_headers via gRPC"
467 );
468 self.handler.on_request_headers(event).await
469 }
470 Some(grpc::agent_request::Event::RequestBodyChunk(e)) => {
471 let event = Self::convert_request_body_chunk_from_grpc(e);
472 trace!(
473 agent_id = %self.id,
474 correlation_id = %event.correlation_id,
475 "Processing request_body_chunk via gRPC"
476 );
477 self.handler.on_request_body_chunk(event).await
478 }
479 Some(grpc::agent_request::Event::ResponseHeaders(e)) => {
480 let event = Self::convert_response_headers_from_grpc(e);
481 trace!(
482 agent_id = %self.id,
483 correlation_id = %event.correlation_id,
484 "Processing response_headers via gRPC"
485 );
486 self.handler.on_response_headers(event).await
487 }
488 Some(grpc::agent_request::Event::ResponseBodyChunk(e)) => {
489 let event = Self::convert_response_body_chunk_from_grpc(e);
490 trace!(
491 agent_id = %self.id,
492 correlation_id = %event.correlation_id,
493 "Processing response_body_chunk via gRPC"
494 );
495 self.handler.on_response_body_chunk(event).await
496 }
497 Some(grpc::agent_request::Event::RequestComplete(e)) => {
498 let event = Self::convert_request_complete_from_grpc(e);
499 trace!(
500 agent_id = %self.id,
501 correlation_id = %event.correlation_id,
502 "Processing request_complete via gRPC"
503 );
504 self.handler.on_request_complete(event).await
505 }
506 Some(grpc::agent_request::Event::WebsocketFrame(e)) => {
507 let event = Self::convert_websocket_frame_from_grpc(e);
508 trace!(
509 agent_id = %self.id,
510 correlation_id = %event.correlation_id,
511 opcode = %event.opcode,
512 "Processing websocket_frame via gRPC"
513 );
514 self.handler.on_websocket_frame(event).await
515 }
516 None => {
517 warn!(agent_id = %self.id, "Missing event in gRPC request");
518 return Err(Status::invalid_argument("Missing event in request"));
519 }
520 };
521
522 trace!(
523 agent_id = %self.id,
524 decision = ?response.decision,
525 "Returning gRPC response"
526 );
527
528 let grpc_response = Self::convert_response_to_grpc(response);
530 Ok(Response::new(grpc_response))
531 }
532
533 async fn process_event_stream(
534 &self,
535 request: Request<Streaming<grpc::AgentRequest>>,
536 ) -> Result<Response<grpc::AgentResponse>, Status> {
537 let mut stream = request.into_inner();
538
539 trace!(agent_id = %self.id, "Processing gRPC event stream");
540
541 let mut final_response = AgentResponse::default_allow();
543 let mut event_count = 0u32;
544
545 while let Some(result) = stream.next().await {
546 let grpc_request = result.map_err(|e| {
547 error!(agent_id = %self.id, error = %e, "Stream error");
548 Status::internal(format!("Stream error: {}", e))
549 })?;
550
551 event_count += 1;
552 trace!(
553 agent_id = %self.id,
554 event_count = event_count,
555 "Processing stream event"
556 );
557
558 let response = match grpc_request.event {
559 Some(grpc::agent_request::Event::RequestHeaders(e)) => {
560 let event = Self::convert_request_headers_from_grpc(e);
561 self.handler.on_request_headers(event).await
562 }
563 Some(grpc::agent_request::Event::RequestBodyChunk(e)) => {
564 let event = Self::convert_request_body_chunk_from_grpc(e);
565 self.handler.on_request_body_chunk(event).await
566 }
567 Some(grpc::agent_request::Event::ResponseHeaders(e)) => {
568 let event = Self::convert_response_headers_from_grpc(e);
569 self.handler.on_response_headers(event).await
570 }
571 Some(grpc::agent_request::Event::ResponseBodyChunk(e)) => {
572 let event = Self::convert_response_body_chunk_from_grpc(e);
573 self.handler.on_response_body_chunk(event).await
574 }
575 Some(grpc::agent_request::Event::RequestComplete(e)) => {
576 let event = Self::convert_request_complete_from_grpc(e);
577 self.handler.on_request_complete(event).await
578 }
579 Some(grpc::agent_request::Event::WebsocketFrame(e)) => {
580 let event = Self::convert_websocket_frame_from_grpc(e);
581 self.handler.on_websocket_frame(event).await
582 }
583 None => continue,
584 };
585
586 if !matches!(response.decision, Decision::Allow) {
588 debug!(
589 agent_id = %self.id,
590 decision = ?response.decision,
591 event_count = event_count,
592 "Non-allow decision in stream, terminating early"
593 );
594 final_response = response;
595 break;
596 }
597 final_response = response;
598 }
599
600 trace!(
601 agent_id = %self.id,
602 event_count = event_count,
603 decision = ?final_response.decision,
604 "Stream processing complete"
605 );
606
607 let grpc_response = Self::convert_response_to_grpc(final_response);
608 Ok(Response::new(grpc_response))
609 }
610}
611
612impl GrpcAgentHandler {
613 fn convert_request_headers_from_grpc(e: grpc::RequestHeadersEvent) -> RequestHeadersEvent {
615 RequestHeadersEvent {
616 metadata: Self::convert_metadata_from_grpc(e.metadata),
617 method: e.method,
618 uri: e.uri,
619 headers: e.headers.into_iter().map(|(k, v)| (k, v.values)).collect(),
620 }
621 }
622
623 fn convert_request_body_chunk_from_grpc(
625 e: grpc::RequestBodyChunkEvent,
626 ) -> RequestBodyChunkEvent {
627 RequestBodyChunkEvent {
628 correlation_id: e.correlation_id,
629 data: String::from_utf8_lossy(&e.data).to_string(),
630 is_last: e.is_last,
631 total_size: e.total_size.map(|s| s as usize),
632 chunk_index: e.chunk_index,
633 bytes_received: e.bytes_received as usize,
634 }
635 }
636
637 fn convert_response_headers_from_grpc(e: grpc::ResponseHeadersEvent) -> ResponseHeadersEvent {
639 ResponseHeadersEvent {
640 correlation_id: e.correlation_id,
641 status: e.status as u16,
642 headers: e.headers.into_iter().map(|(k, v)| (k, v.values)).collect(),
643 }
644 }
645
646 fn convert_response_body_chunk_from_grpc(
648 e: grpc::ResponseBodyChunkEvent,
649 ) -> ResponseBodyChunkEvent {
650 ResponseBodyChunkEvent {
651 correlation_id: e.correlation_id,
652 data: String::from_utf8_lossy(&e.data).to_string(),
653 is_last: e.is_last,
654 total_size: e.total_size.map(|s| s as usize),
655 chunk_index: e.chunk_index,
656 bytes_sent: e.bytes_sent as usize,
657 }
658 }
659
660 fn convert_request_complete_from_grpc(e: grpc::RequestCompleteEvent) -> RequestCompleteEvent {
662 RequestCompleteEvent {
663 correlation_id: e.correlation_id,
664 status: e.status as u16,
665 duration_ms: e.duration_ms,
666 request_body_size: e.request_body_size as usize,
667 response_body_size: e.response_body_size as usize,
668 upstream_attempts: e.upstream_attempts,
669 error: e.error,
670 }
671 }
672
673 fn convert_websocket_frame_from_grpc(e: grpc::WebSocketFrameEvent) -> WebSocketFrameEvent {
675 use base64::{engine::general_purpose::STANDARD, Engine as _};
676 WebSocketFrameEvent {
677 correlation_id: e.correlation_id,
678 opcode: e.opcode,
679 data: STANDARD.encode(&e.data),
680 client_to_server: e.client_to_server,
681 frame_index: e.frame_index,
682 fin: e.fin,
683 route_id: e.route_id,
684 client_ip: e.client_ip,
685 }
686 }
687
688 fn convert_metadata_from_grpc(metadata: Option<grpc::RequestMetadata>) -> RequestMetadata {
690 match metadata {
691 Some(m) => RequestMetadata {
692 correlation_id: m.correlation_id,
693 request_id: m.request_id,
694 client_ip: m.client_ip,
695 client_port: m.client_port as u16,
696 server_name: m.server_name,
697 protocol: m.protocol,
698 tls_version: m.tls_version,
699 tls_cipher: m.tls_cipher,
700 route_id: m.route_id,
701 upstream_id: m.upstream_id,
702 timestamp: m.timestamp,
703 },
704 None => RequestMetadata {
705 correlation_id: String::new(),
706 request_id: String::new(),
707 client_ip: String::new(),
708 client_port: 0,
709 server_name: None,
710 protocol: String::new(),
711 tls_version: None,
712 tls_cipher: None,
713 route_id: None,
714 upstream_id: None,
715 timestamp: String::new(),
716 },
717 }
718 }
719
720 fn convert_response_to_grpc(response: AgentResponse) -> grpc::AgentResponse {
722 let decision = match response.decision {
723 Decision::Allow => Some(grpc::agent_response::Decision::Allow(
724 grpc::AllowDecision {},
725 )),
726 Decision::Block {
727 status,
728 body,
729 headers,
730 } => Some(grpc::agent_response::Decision::Block(grpc::BlockDecision {
731 status: status as u32,
732 body,
733 headers: headers.unwrap_or_default(),
734 })),
735 Decision::Redirect { url, status } => Some(grpc::agent_response::Decision::Redirect(
736 grpc::RedirectDecision {
737 url,
738 status: status as u32,
739 },
740 )),
741 Decision::Challenge {
742 challenge_type,
743 params,
744 } => Some(grpc::agent_response::Decision::Challenge(
745 grpc::ChallengeDecision {
746 challenge_type,
747 params,
748 },
749 )),
750 };
751
752 let request_headers: Vec<grpc::HeaderOp> = response
753 .request_headers
754 .into_iter()
755 .map(Self::convert_header_op_to_grpc)
756 .collect();
757
758 let response_headers: Vec<grpc::HeaderOp> = response
759 .response_headers
760 .into_iter()
761 .map(Self::convert_header_op_to_grpc)
762 .collect();
763
764 let audit = Some(grpc::AuditMetadata {
765 tags: response.audit.tags,
766 rule_ids: response.audit.rule_ids,
767 confidence: response.audit.confidence,
768 reason_codes: response.audit.reason_codes,
769 custom: response
770 .audit
771 .custom
772 .into_iter()
773 .map(|(k, v)| (k, v.to_string()))
774 .collect(),
775 });
776
777 let request_body_mutation = response.request_body_mutation.map(|m| grpc::BodyMutation {
779 data: m.data.map(|d| d.into_bytes()),
780 chunk_index: m.chunk_index,
781 });
782
783 let response_body_mutation = response.response_body_mutation.map(|m| grpc::BodyMutation {
784 data: m.data.map(|d| d.into_bytes()),
785 chunk_index: m.chunk_index,
786 });
787
788 let websocket_decision = response
790 .websocket_decision
791 .map(|ws_decision| match ws_decision {
792 WebSocketDecision::Allow => {
793 grpc::agent_response::WebsocketDecision::WebsocketAllow(
794 grpc::WebSocketAllowDecision {},
795 )
796 }
797 WebSocketDecision::Drop => grpc::agent_response::WebsocketDecision::WebsocketDrop(
798 grpc::WebSocketDropDecision {},
799 ),
800 WebSocketDecision::Close { code, reason } => {
801 grpc::agent_response::WebsocketDecision::WebsocketClose(
802 grpc::WebSocketCloseDecision {
803 code: code as u32,
804 reason,
805 },
806 )
807 }
808 });
809
810 grpc::AgentResponse {
811 version: PROTOCOL_VERSION,
812 decision,
813 request_headers,
814 response_headers,
815 routing_metadata: response.routing_metadata,
816 audit,
817 needs_more: response.needs_more,
818 request_body_mutation,
819 response_body_mutation,
820 websocket_decision,
821 }
822 }
823
824 fn convert_header_op_to_grpc(op: HeaderOp) -> grpc::HeaderOp {
826 let operation = match op {
827 HeaderOp::Set { name, value } => {
828 Some(grpc::header_op::Operation::Set(grpc::SetHeader {
829 name,
830 value,
831 }))
832 }
833 HeaderOp::Add { name, value } => {
834 Some(grpc::header_op::Operation::Add(grpc::AddHeader {
835 name,
836 value,
837 }))
838 }
839 HeaderOp::Remove { name } => {
840 Some(grpc::header_op::Operation::Remove(grpc::RemoveHeader {
841 name,
842 }))
843 }
844 };
845 grpc::HeaderOp { operation }
846 }
847}