1use async_trait::async_trait;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use tokio::io::{AsyncReadExt, AsyncWriteExt};
11use tokio::net::{UnixListener, UnixStream};
12use tokio_stream::StreamExt;
13use tonic::{Request, Response, Status, Streaming};
14use tracing::{debug, error, info, trace, warn};
15
16use crate::errors::AgentProtocolError;
17use crate::grpc::{
18 self, agent_processor_server::AgentProcessor, agent_processor_server::AgentProcessorServer,
19};
20use crate::protocol::{
21 AgentRequest, AgentResponse, AuditMetadata, ConfigureEvent, Decision, EventType, HeaderOp,
22 RequestBodyChunkEvent, RequestCompleteEvent, RequestHeadersEvent, RequestMetadata,
23 ResponseBodyChunkEvent, ResponseHeadersEvent, WebSocketDecision, WebSocketFrameEvent,
24 MAX_MESSAGE_SIZE, PROTOCOL_VERSION,
25};
26
27pub struct AgentServer {
29 id: String,
31 socket_path: std::path::PathBuf,
33 handler: Arc<dyn AgentHandler>,
35}
36
37#[async_trait]
39pub trait AgentHandler: Send + Sync {
40 async fn on_configure(&self, _event: ConfigureEvent) -> AgentResponse {
48 AgentResponse::default_allow()
49 }
50
51 async fn on_request_headers(&self, _event: RequestHeadersEvent) -> AgentResponse {
53 AgentResponse::default_allow()
54 }
55
56 async fn on_request_body_chunk(&self, _event: RequestBodyChunkEvent) -> AgentResponse {
58 AgentResponse::default_allow()
59 }
60
61 async fn on_response_headers(&self, _event: ResponseHeadersEvent) -> AgentResponse {
63 AgentResponse::default_allow()
64 }
65
66 async fn on_response_body_chunk(&self, _event: ResponseBodyChunkEvent) -> AgentResponse {
68 AgentResponse::default_allow()
69 }
70
71 async fn on_request_complete(&self, _event: RequestCompleteEvent) -> AgentResponse {
73 AgentResponse::default_allow()
74 }
75
76 async fn on_websocket_frame(&self, _event: WebSocketFrameEvent) -> AgentResponse {
83 AgentResponse::websocket_allow()
84 }
85}
86
87impl AgentServer {
88 pub fn new(
90 id: impl Into<String>,
91 socket_path: impl Into<std::path::PathBuf>,
92 handler: Box<dyn AgentHandler>,
93 ) -> Self {
94 let id = id.into();
95 let socket_path = socket_path.into();
96
97 debug!(
98 agent_id = %id,
99 socket_path = %socket_path.display(),
100 "Creating agent server"
101 );
102
103 Self {
104 id,
105 socket_path,
106 handler: Arc::from(handler),
107 }
108 }
109
110 pub async fn run(&self) -> Result<(), AgentProtocolError> {
112 if self.socket_path.exists() {
114 trace!(
115 agent_id = %self.id,
116 socket_path = %self.socket_path.display(),
117 "Removing existing socket file"
118 );
119 std::fs::remove_file(&self.socket_path)?;
120 }
121
122 let listener = UnixListener::bind(&self.socket_path)?;
124
125 info!(
126 agent_id = %self.id,
127 socket_path = %self.socket_path.display(),
128 "Agent server listening"
129 );
130
131 loop {
132 match listener.accept().await {
133 Ok((stream, _addr)) => {
134 trace!(
135 agent_id = %self.id,
136 "Accepted new connection"
137 );
138 let handler = Arc::clone(&self.handler);
139 let agent_id = self.id.clone();
140 tokio::spawn(async move {
141 if let Err(e) = Self::handle_connection(stream, handler.as_ref()).await {
142 error!(
143 agent_id = %agent_id,
144 error = %e,
145 "Error handling agent connection"
146 );
147 }
148 });
149 }
150 Err(e) => {
151 error!(
152 agent_id = %self.id,
153 error = %e,
154 "Failed to accept connection"
155 );
156 }
157 }
158 }
159 }
160
161 async fn handle_connection(
163 mut stream: UnixStream,
164 handler: &dyn AgentHandler,
165 ) -> Result<(), AgentProtocolError> {
166 trace!("Starting connection handler");
167
168 loop {
169 let mut len_bytes = [0u8; 4];
171 match stream.read_exact(&mut len_bytes).await {
172 Ok(_) => {}
173 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
174 trace!("Client disconnected (EOF)");
176 return Ok(());
177 }
178 Err(e) => {
179 error!(error = %e, "Error reading message length");
180 return Err(e.into());
181 }
182 }
183
184 let message_len = u32::from_be_bytes(len_bytes) as usize;
185
186 if message_len > MAX_MESSAGE_SIZE {
188 warn!(
189 message_len = message_len,
190 max_size = MAX_MESSAGE_SIZE,
191 "Message too large"
192 );
193 return Err(AgentProtocolError::MessageTooLarge {
194 size: message_len,
195 max: MAX_MESSAGE_SIZE,
196 });
197 }
198
199 trace!(message_len = message_len, "Reading message data");
200
201 let mut buffer = vec![0u8; message_len];
203 stream.read_exact(&mut buffer).await?;
204
205 let request: AgentRequest = serde_json::from_slice(&buffer)
207 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
208
209 trace!(
210 event_type = ?request.event_type,
211 version = request.version,
212 "Received agent request"
213 );
214
215 let response = match request.event_type {
217 EventType::Configure => {
218 let event: ConfigureEvent = serde_json::from_value(request.payload)
219 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
220 trace!(
221 agent_id = %event.agent_id,
222 "Processing configure event"
223 );
224 handler.on_configure(event).await
225 }
226 EventType::RequestHeaders => {
227 let event: RequestHeadersEvent = serde_json::from_value(request.payload)
228 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
229 trace!(
230 correlation_id = %event.metadata.correlation_id,
231 method = %event.method,
232 uri = %event.uri,
233 "Processing request_headers event"
234 );
235 handler.on_request_headers(event).await
236 }
237 EventType::RequestBodyChunk => {
238 let event: RequestBodyChunkEvent = serde_json::from_value(request.payload)
239 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
240 trace!(
241 correlation_id = %event.correlation_id,
242 is_last = event.is_last,
243 data_len = event.data.len(),
244 "Processing request_body_chunk event"
245 );
246 handler.on_request_body_chunk(event).await
247 }
248 EventType::ResponseHeaders => {
249 let event: ResponseHeadersEvent = serde_json::from_value(request.payload)
250 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
251 trace!(
252 correlation_id = %event.correlation_id,
253 status = event.status,
254 "Processing response_headers event"
255 );
256 handler.on_response_headers(event).await
257 }
258 EventType::ResponseBodyChunk => {
259 let event: ResponseBodyChunkEvent = serde_json::from_value(request.payload)
260 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
261 trace!(
262 correlation_id = %event.correlation_id,
263 is_last = event.is_last,
264 data_len = event.data.len(),
265 "Processing response_body_chunk event"
266 );
267 handler.on_response_body_chunk(event).await
268 }
269 EventType::RequestComplete => {
270 let event: RequestCompleteEvent = serde_json::from_value(request.payload)
271 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
272 trace!(
273 correlation_id = %event.correlation_id,
274 status = event.status,
275 duration_ms = event.duration_ms,
276 "Processing request_complete event"
277 );
278 handler.on_request_complete(event).await
279 }
280 EventType::WebSocketFrame => {
281 let event: WebSocketFrameEvent = serde_json::from_value(request.payload)
282 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
283 trace!(
284 correlation_id = %event.correlation_id,
285 opcode = %event.opcode,
286 frame_index = event.frame_index,
287 client_to_server = event.client_to_server,
288 "Processing websocket_frame event"
289 );
290 handler.on_websocket_frame(event).await
291 }
292 };
293
294 trace!(
295 decision = ?response.decision,
296 "Sending agent response"
297 );
298
299 let response_bytes = serde_json::to_vec(&response)
301 .map_err(|e| AgentProtocolError::Serialization(e.to_string()))?;
302
303 let len_bytes = (response_bytes.len() as u32).to_be_bytes();
305 stream.write_all(&len_bytes).await?;
306 stream.write_all(&response_bytes).await?;
308 stream.flush().await?;
309
310 trace!(response_len = response_bytes.len(), "Response sent");
311 }
312 }
313}
314
315pub struct EchoAgent;
317
318#[async_trait]
319impl AgentHandler for EchoAgent {
320 async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
321 debug!(
322 "Echo agent: request headers for {}",
323 event.metadata.correlation_id
324 );
325
326 AgentResponse::default_allow()
328 .add_request_header(HeaderOp::Set {
329 name: "X-Echo-Agent".to_string(),
330 value: event.metadata.correlation_id.clone(),
331 })
332 .with_audit(AuditMetadata {
333 tags: vec!["echo".to_string()],
334 ..Default::default()
335 })
336 }
337}
338
339pub struct DenylistAgent {
341 blocked_paths: Vec<String>,
342 blocked_ips: Vec<String>,
343}
344
345impl DenylistAgent {
346 pub fn new(blocked_paths: Vec<String>, blocked_ips: Vec<String>) -> Self {
347 Self {
348 blocked_paths,
349 blocked_ips,
350 }
351 }
352}
353
354#[async_trait]
355impl AgentHandler for DenylistAgent {
356 async fn on_request_headers(&self, event: RequestHeadersEvent) -> AgentResponse {
357 trace!(
358 correlation_id = %event.metadata.correlation_id,
359 uri = %event.uri,
360 client_ip = %event.metadata.client_ip,
361 "Denylist agent checking request"
362 );
363
364 for blocked_path in &self.blocked_paths {
366 if event.uri.starts_with(blocked_path) {
367 debug!(
368 correlation_id = %event.metadata.correlation_id,
369 blocked_path = %blocked_path,
370 uri = %event.uri,
371 "Blocking request: path matched denylist"
372 );
373 return AgentResponse::block(403, Some("Forbidden path".to_string())).with_audit(
374 AuditMetadata {
375 tags: vec!["denylist".to_string(), "blocked_path".to_string()],
376 reason_codes: vec!["PATH_BLOCKED".to_string()],
377 ..Default::default()
378 },
379 );
380 }
381 }
382
383 if self.blocked_ips.contains(&event.metadata.client_ip) {
385 debug!(
386 correlation_id = %event.metadata.correlation_id,
387 client_ip = %event.metadata.client_ip,
388 "Blocking request: IP matched denylist"
389 );
390 return AgentResponse::block(403, Some("Forbidden IP".to_string())).with_audit(
391 AuditMetadata {
392 tags: vec!["denylist".to_string(), "blocked_ip".to_string()],
393 reason_codes: vec!["IP_BLOCKED".to_string()],
394 ..Default::default()
395 },
396 );
397 }
398
399 trace!(
400 correlation_id = %event.metadata.correlation_id,
401 "Request allowed by denylist agent"
402 );
403 AgentResponse::default_allow()
404 }
405}
406
407pub struct GrpcAgentServer {
413 id: String,
415 handler: Arc<dyn AgentHandler>,
417}
418
419impl GrpcAgentServer {
420 pub fn new(id: impl Into<String>, handler: Box<dyn AgentHandler>) -> Self {
422 let id = id.into();
423 debug!(agent_id = %id, "Creating gRPC agent server");
424 Self {
425 id,
426 handler: Arc::from(handler),
427 }
428 }
429
430 pub fn into_service(self) -> AgentProcessorServer<GrpcAgentHandler> {
432 trace!(agent_id = %self.id, "Converting to tonic service");
433 AgentProcessorServer::new(GrpcAgentHandler {
434 id: self.id,
435 handler: self.handler,
436 })
437 }
438
439 pub async fn run(self, addr: SocketAddr) -> Result<(), AgentProtocolError> {
441 info!(
442 agent_id = %self.id,
443 address = %addr,
444 "gRPC agent server listening"
445 );
446
447 tonic::transport::Server::builder()
448 .add_service(self.into_service())
449 .serve(addr)
450 .await
451 .map_err(|e| {
452 error!(error = %e, "gRPC server error");
453 AgentProtocolError::ConnectionFailed(format!("gRPC server error: {}", e))
454 })
455 }
456}
457
458pub struct GrpcAgentHandler {
460 id: String,
461 handler: Arc<dyn AgentHandler>,
462}
463
464#[tonic::async_trait]
465impl AgentProcessor for GrpcAgentHandler {
466 async fn process_event(
467 &self,
468 request: Request<grpc::AgentRequest>,
469 ) -> Result<Response<grpc::AgentResponse>, Status> {
470 let grpc_request = request.into_inner();
471
472 trace!(
473 agent_id = %self.id,
474 event_type = grpc_request.event_type,
475 version = grpc_request.version,
476 "Processing gRPC event"
477 );
478
479 let response = match grpc_request.event {
481 Some(grpc::agent_request::Event::RequestHeaders(e)) => {
482 let event = Self::convert_request_headers_from_grpc(e);
483 trace!(
484 agent_id = %self.id,
485 correlation_id = %event.metadata.correlation_id,
486 "Processing request_headers via gRPC"
487 );
488 self.handler.on_request_headers(event).await
489 }
490 Some(grpc::agent_request::Event::RequestBodyChunk(e)) => {
491 let event = Self::convert_request_body_chunk_from_grpc(e);
492 trace!(
493 agent_id = %self.id,
494 correlation_id = %event.correlation_id,
495 "Processing request_body_chunk via gRPC"
496 );
497 self.handler.on_request_body_chunk(event).await
498 }
499 Some(grpc::agent_request::Event::ResponseHeaders(e)) => {
500 let event = Self::convert_response_headers_from_grpc(e);
501 trace!(
502 agent_id = %self.id,
503 correlation_id = %event.correlation_id,
504 "Processing response_headers via gRPC"
505 );
506 self.handler.on_response_headers(event).await
507 }
508 Some(grpc::agent_request::Event::ResponseBodyChunk(e)) => {
509 let event = Self::convert_response_body_chunk_from_grpc(e);
510 trace!(
511 agent_id = %self.id,
512 correlation_id = %event.correlation_id,
513 "Processing response_body_chunk via gRPC"
514 );
515 self.handler.on_response_body_chunk(event).await
516 }
517 Some(grpc::agent_request::Event::RequestComplete(e)) => {
518 let event = Self::convert_request_complete_from_grpc(e);
519 trace!(
520 agent_id = %self.id,
521 correlation_id = %event.correlation_id,
522 "Processing request_complete via gRPC"
523 );
524 self.handler.on_request_complete(event).await
525 }
526 Some(grpc::agent_request::Event::WebsocketFrame(e)) => {
527 let event = Self::convert_websocket_frame_from_grpc(e);
528 trace!(
529 agent_id = %self.id,
530 correlation_id = %event.correlation_id,
531 opcode = %event.opcode,
532 "Processing websocket_frame via gRPC"
533 );
534 self.handler.on_websocket_frame(event).await
535 }
536 None => {
537 warn!(agent_id = %self.id, "Missing event in gRPC request");
538 return Err(Status::invalid_argument("Missing event in request"));
539 }
540 };
541
542 trace!(
543 agent_id = %self.id,
544 decision = ?response.decision,
545 "Returning gRPC response"
546 );
547
548 let grpc_response = Self::convert_response_to_grpc(response);
550 Ok(Response::new(grpc_response))
551 }
552
553 async fn process_event_stream(
554 &self,
555 request: Request<Streaming<grpc::AgentRequest>>,
556 ) -> Result<Response<grpc::AgentResponse>, Status> {
557 let mut stream = request.into_inner();
558
559 trace!(agent_id = %self.id, "Processing gRPC event stream");
560
561 let mut final_response = AgentResponse::default_allow();
563 let mut event_count = 0u32;
564
565 while let Some(result) = stream.next().await {
566 let grpc_request = result.map_err(|e| {
567 error!(agent_id = %self.id, error = %e, "Stream error");
568 Status::internal(format!("Stream error: {}", e))
569 })?;
570
571 event_count += 1;
572 trace!(
573 agent_id = %self.id,
574 event_count = event_count,
575 "Processing stream event"
576 );
577
578 let response = match grpc_request.event {
579 Some(grpc::agent_request::Event::RequestHeaders(e)) => {
580 let event = Self::convert_request_headers_from_grpc(e);
581 self.handler.on_request_headers(event).await
582 }
583 Some(grpc::agent_request::Event::RequestBodyChunk(e)) => {
584 let event = Self::convert_request_body_chunk_from_grpc(e);
585 self.handler.on_request_body_chunk(event).await
586 }
587 Some(grpc::agent_request::Event::ResponseHeaders(e)) => {
588 let event = Self::convert_response_headers_from_grpc(e);
589 self.handler.on_response_headers(event).await
590 }
591 Some(grpc::agent_request::Event::ResponseBodyChunk(e)) => {
592 let event = Self::convert_response_body_chunk_from_grpc(e);
593 self.handler.on_response_body_chunk(event).await
594 }
595 Some(grpc::agent_request::Event::RequestComplete(e)) => {
596 let event = Self::convert_request_complete_from_grpc(e);
597 self.handler.on_request_complete(event).await
598 }
599 Some(grpc::agent_request::Event::WebsocketFrame(e)) => {
600 let event = Self::convert_websocket_frame_from_grpc(e);
601 self.handler.on_websocket_frame(event).await
602 }
603 None => continue,
604 };
605
606 if !matches!(response.decision, Decision::Allow) {
608 debug!(
609 agent_id = %self.id,
610 decision = ?response.decision,
611 event_count = event_count,
612 "Non-allow decision in stream, terminating early"
613 );
614 final_response = response;
615 break;
616 }
617 final_response = response;
618 }
619
620 trace!(
621 agent_id = %self.id,
622 event_count = event_count,
623 decision = ?final_response.decision,
624 "Stream processing complete"
625 );
626
627 let grpc_response = Self::convert_response_to_grpc(final_response);
628 Ok(Response::new(grpc_response))
629 }
630}
631
632impl GrpcAgentHandler {
633 fn convert_request_headers_from_grpc(e: grpc::RequestHeadersEvent) -> RequestHeadersEvent {
635 RequestHeadersEvent {
636 metadata: Self::convert_metadata_from_grpc(e.metadata),
637 method: e.method,
638 uri: e.uri,
639 headers: e.headers.into_iter().map(|(k, v)| (k, v.values)).collect(),
640 }
641 }
642
643 fn convert_request_body_chunk_from_grpc(
645 e: grpc::RequestBodyChunkEvent,
646 ) -> RequestBodyChunkEvent {
647 RequestBodyChunkEvent {
648 correlation_id: e.correlation_id,
649 data: String::from_utf8_lossy(&e.data).to_string(),
650 is_last: e.is_last,
651 total_size: e.total_size.map(|s| s as usize),
652 chunk_index: e.chunk_index,
653 bytes_received: e.bytes_received as usize,
654 }
655 }
656
657 fn convert_response_headers_from_grpc(e: grpc::ResponseHeadersEvent) -> ResponseHeadersEvent {
659 ResponseHeadersEvent {
660 correlation_id: e.correlation_id,
661 status: e.status as u16,
662 headers: e.headers.into_iter().map(|(k, v)| (k, v.values)).collect(),
663 }
664 }
665
666 fn convert_response_body_chunk_from_grpc(
668 e: grpc::ResponseBodyChunkEvent,
669 ) -> ResponseBodyChunkEvent {
670 ResponseBodyChunkEvent {
671 correlation_id: e.correlation_id,
672 data: String::from_utf8_lossy(&e.data).to_string(),
673 is_last: e.is_last,
674 total_size: e.total_size.map(|s| s as usize),
675 chunk_index: e.chunk_index,
676 bytes_sent: e.bytes_sent as usize,
677 }
678 }
679
680 fn convert_request_complete_from_grpc(e: grpc::RequestCompleteEvent) -> RequestCompleteEvent {
682 RequestCompleteEvent {
683 correlation_id: e.correlation_id,
684 status: e.status as u16,
685 duration_ms: e.duration_ms,
686 request_body_size: e.request_body_size as usize,
687 response_body_size: e.response_body_size as usize,
688 upstream_attempts: e.upstream_attempts,
689 error: e.error,
690 }
691 }
692
693 fn convert_websocket_frame_from_grpc(e: grpc::WebSocketFrameEvent) -> WebSocketFrameEvent {
695 use base64::{engine::general_purpose::STANDARD, Engine as _};
696 WebSocketFrameEvent {
697 correlation_id: e.correlation_id,
698 opcode: e.opcode,
699 data: STANDARD.encode(&e.data),
700 client_to_server: e.client_to_server,
701 frame_index: e.frame_index,
702 fin: e.fin,
703 route_id: e.route_id,
704 client_ip: e.client_ip,
705 }
706 }
707
708 fn convert_metadata_from_grpc(metadata: Option<grpc::RequestMetadata>) -> RequestMetadata {
710 match metadata {
711 Some(m) => RequestMetadata {
712 correlation_id: m.correlation_id,
713 request_id: m.request_id,
714 client_ip: m.client_ip,
715 client_port: m.client_port as u16,
716 server_name: m.server_name,
717 protocol: m.protocol,
718 tls_version: m.tls_version,
719 tls_cipher: m.tls_cipher,
720 route_id: m.route_id,
721 upstream_id: m.upstream_id,
722 timestamp: m.timestamp,
723 },
724 None => RequestMetadata {
725 correlation_id: String::new(),
726 request_id: String::new(),
727 client_ip: String::new(),
728 client_port: 0,
729 server_name: None,
730 protocol: String::new(),
731 tls_version: None,
732 tls_cipher: None,
733 route_id: None,
734 upstream_id: None,
735 timestamp: String::new(),
736 },
737 }
738 }
739
740 fn convert_response_to_grpc(response: AgentResponse) -> grpc::AgentResponse {
742 let decision = match response.decision {
743 Decision::Allow => Some(grpc::agent_response::Decision::Allow(
744 grpc::AllowDecision {},
745 )),
746 Decision::Block {
747 status,
748 body,
749 headers,
750 } => Some(grpc::agent_response::Decision::Block(grpc::BlockDecision {
751 status: status as u32,
752 body,
753 headers: headers.unwrap_or_default(),
754 })),
755 Decision::Redirect { url, status } => Some(grpc::agent_response::Decision::Redirect(
756 grpc::RedirectDecision {
757 url,
758 status: status as u32,
759 },
760 )),
761 Decision::Challenge {
762 challenge_type,
763 params,
764 } => Some(grpc::agent_response::Decision::Challenge(
765 grpc::ChallengeDecision {
766 challenge_type,
767 params,
768 },
769 )),
770 };
771
772 let request_headers: Vec<grpc::HeaderOp> = response
773 .request_headers
774 .into_iter()
775 .map(Self::convert_header_op_to_grpc)
776 .collect();
777
778 let response_headers: Vec<grpc::HeaderOp> = response
779 .response_headers
780 .into_iter()
781 .map(Self::convert_header_op_to_grpc)
782 .collect();
783
784 let audit = Some(grpc::AuditMetadata {
785 tags: response.audit.tags,
786 rule_ids: response.audit.rule_ids,
787 confidence: response.audit.confidence,
788 reason_codes: response.audit.reason_codes,
789 custom: response
790 .audit
791 .custom
792 .into_iter()
793 .map(|(k, v)| (k, v.to_string()))
794 .collect(),
795 });
796
797 let request_body_mutation = response.request_body_mutation.map(|m| grpc::BodyMutation {
799 data: m.data.map(|d| d.into_bytes()),
800 chunk_index: m.chunk_index,
801 });
802
803 let response_body_mutation = response.response_body_mutation.map(|m| grpc::BodyMutation {
804 data: m.data.map(|d| d.into_bytes()),
805 chunk_index: m.chunk_index,
806 });
807
808 let websocket_decision = response
810 .websocket_decision
811 .map(|ws_decision| match ws_decision {
812 WebSocketDecision::Allow => {
813 grpc::agent_response::WebsocketDecision::WebsocketAllow(
814 grpc::WebSocketAllowDecision {},
815 )
816 }
817 WebSocketDecision::Drop => grpc::agent_response::WebsocketDecision::WebsocketDrop(
818 grpc::WebSocketDropDecision {},
819 ),
820 WebSocketDecision::Close { code, reason } => {
821 grpc::agent_response::WebsocketDecision::WebsocketClose(
822 grpc::WebSocketCloseDecision {
823 code: code as u32,
824 reason,
825 },
826 )
827 }
828 });
829
830 grpc::AgentResponse {
831 version: PROTOCOL_VERSION,
832 decision,
833 request_headers,
834 response_headers,
835 routing_metadata: response.routing_metadata,
836 audit,
837 needs_more: response.needs_more,
838 request_body_mutation,
839 response_body_mutation,
840 websocket_decision,
841 }
842 }
843
844 fn convert_header_op_to_grpc(op: HeaderOp) -> grpc::HeaderOp {
846 let operation = match op {
847 HeaderOp::Set { name, value } => {
848 Some(grpc::header_op::Operation::Set(grpc::SetHeader {
849 name,
850 value,
851 }))
852 }
853 HeaderOp::Add { name, value } => {
854 Some(grpc::header_op::Operation::Add(grpc::AddHeader {
855 name,
856 value,
857 }))
858 }
859 HeaderOp::Remove { name } => {
860 Some(grpc::header_op::Operation::Remove(grpc::RemoveHeader {
861 name,
862 }))
863 }
864 };
865 grpc::HeaderOp { operation }
866 }
867}