1pub mod batch;
40pub mod config;
41pub mod proto;
42pub mod token;
43
44pub use config::GrpcConfig;
45pub use token::GrpcToken;
46
47use super::error::{TransportError, TransportResult};
48use super::traits::{RecvBatch, TransportBase, TransportReceiver, TransportSender};
49use super::types::{Message, PayloadFormat, SendResult};
50use super::work_batch::{Record, WorkBatch};
51use std::collections::HashMap;
52use std::sync::Arc;
53use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
54use tokio::sync::{mpsc, oneshot};
55use tonic::{Request, Response, Status};
56
57pub struct GrpcTransport {
62 client: Option<proto::dfe_transport_client::DfeTransportClient<tonic::transport::Channel>>,
64
65 receiver: Option<tokio::sync::Mutex<mpsc::Receiver<Message<GrpcToken>>>>,
67
68 shutdown_tx: parking_lot::Mutex<Option<oneshot::Sender<()>>>,
71
72 _server_handle: Option<tokio::task::JoinHandle<Result<(), tonic::transport::Error>>>,
74
75 closed: AtomicBool,
77
78 healthy: Arc<AtomicBool>,
80
81 recv_timeout_ms: u64,
83
84 send_timeout_ms: u64,
86
87 max_message_size: usize,
91
92 #[cfg(feature = "metrics")]
94 inflight: AtomicU64,
95
96 filter_engine: super::filter::TransportFilterEngine,
98}
99
100fn build_grpc_client_tls(
106 config: &GrpcConfig,
107) -> TransportResult<tonic::transport::ClientTlsConfig> {
108 use tonic::transport::{Certificate, ClientTlsConfig, Identity};
109
110 let mut tls = ClientTlsConfig::new();
111
112 if let Some(ref ca) = config.tls_ca_path {
113 let pem = std::fs::read(ca)
114 .map_err(|e| TransportError::Config(format!("gRPC TLS: cannot read ca {ca}: {e}")))?;
115 tls = tls.ca_certificate(Certificate::from_pem(pem));
116 } else {
117 tls = tls.with_native_roots();
119 }
120
121 if let Some(ref domain) = config.tls_domain {
122 tls = tls.domain_name(domain.clone());
123 }
124
125 match (&config.tls_client_cert_path, &config.tls_client_key_path) {
127 (Some(cert), Some(key)) => {
128 let cert_pem = std::fs::read(cert).map_err(|e| {
129 TransportError::Config(format!("gRPC TLS: cannot read client cert {cert}: {e}"))
130 })?;
131 let key_pem = std::fs::read(key).map_err(|e| {
132 TransportError::Config(format!("gRPC TLS: cannot read client key {key}: {e}"))
133 })?;
134 tls = tls.identity(Identity::from_pem(cert_pem, key_pem));
135 }
136 (None, None) => {}
137 _ => {
138 return Err(TransportError::Config(
139 "gRPC TLS: mTLS requires BOTH tls_client_cert_path and tls_client_key_path"
140 .to_string(),
141 ));
142 }
143 }
144
145 Ok(tls)
146}
147
148impl GrpcTransport {
149 pub async fn new(config: &GrpcConfig) -> TransportResult<Self> {
161 Self::new_inner(
162 config,
163 #[cfg(feature = "governor")]
164 None,
165 )
166 .await
167 }
168
169 #[cfg(feature = "governor")]
182 pub async fn with_pressure(
183 config: &GrpcConfig,
184 pressure: Option<Arc<crate::governor::UnifiedPressure>>,
185 ) -> TransportResult<Self> {
186 Self::new_inner(config, pressure).await
187 }
188
189 async fn new_inner(
190 config: &GrpcConfig,
191 #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
192 ) -> TransportResult<Self> {
193 let mut client = None;
194 let mut receiver = None;
195 let mut shutdown_tx = None;
196 let mut server_handle = None;
197 let sequence = Arc::new(AtomicU64::new(0));
198
199 if let Some(endpoint) = &config.endpoint {
201 let mut ep = tonic::transport::Channel::from_shared(endpoint.clone())
202 .map_err(|e| TransportError::Config(format!("invalid endpoint: {e}")))?;
203
204 if config.tls_enabled {
207 ep = ep
208 .tls_config(build_grpc_client_tls(config)?)
209 .map_err(|e| TransportError::Config(format!("gRPC TLS config: {e}")))?;
210 }
211
212 let channel = ep.connect_lazy();
213
214 let mut c = proto::dfe_transport_client::DfeTransportClient::new(channel)
215 .max_decoding_message_size(config.max_message_size)
216 .max_encoding_message_size(config.max_message_size);
217
218 if config.compression {
219 c = c
220 .send_compressed(tonic::codec::CompressionEncoding::Gzip)
221 .accept_compressed(tonic::codec::CompressionEncoding::Gzip);
222 }
223
224 client = Some(c);
225 }
226
227 if let Some(listen) = &config.listen {
229 let addr: std::net::SocketAddr = listen
230 .parse()
231 .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
232
233 let (tx, rx) = mpsc::channel(config.recv_buffer_size);
234 let (sd_tx, sd_rx) = oneshot::channel();
235
236 let dfe_svc = DfeTransportServiceImpl {
238 sender: tx.clone(),
239 sequence: sequence.clone(),
240 #[cfg(feature = "metrics")]
241 server_inflight: Arc::new(AtomicU64::new(0)),
242 #[cfg(feature = "governor")]
243 pressure: pressure.clone(),
244 };
245
246 let dfe_server = proto::dfe_transport_server::DfeTransportServer::new(dfe_svc)
247 .max_decoding_message_size(config.max_message_size)
248 .max_encoding_message_size(config.max_message_size)
249 .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
250 .send_compressed(tonic::codec::CompressionEncoding::Gzip);
251
252 let mut builder = tonic::transport::Server::builder();
254
255 #[cfg(feature = "transport-grpc-vector-compat")]
256 let router = if config.vector_compat {
257 let vector_svc =
258 super::vector_compat::source::VectorCompatService::new(tx, sequence.clone());
259 let vector_server =
260 super::vector_compat::proto::vector::vector_server::VectorServer::new(
261 vector_svc,
262 )
263 .max_decoding_message_size(config.max_message_size)
264 .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
265 .send_compressed(tonic::codec::CompressionEncoding::Gzip);
266
267 builder.add_service(dfe_server).add_service(vector_server)
268 } else {
269 builder.add_service(dfe_server)
270 };
271
272 #[cfg(not(feature = "transport-grpc-vector-compat"))]
273 let router = builder.add_service(dfe_server);
274
275 let listener = tokio::net::TcpListener::bind(addr)
280 .await
281 .map_err(|e| TransportError::Config(format!("failed to bind {addr}: {e}")))?;
282 let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
283
284 let handle = tokio::spawn(async move {
285 router
286 .serve_with_incoming_shutdown(incoming, async {
287 sd_rx.await.ok();
288 })
289 .await
290 });
291
292 receiver = Some(tokio::sync::Mutex::new(rx));
293 shutdown_tx = Some(sd_tx);
294 server_handle = Some(handle);
295 } else {
296 #[cfg(feature = "governor")]
299 let _ = pressure;
300 }
301
302 let healthy = Arc::new(AtomicBool::new(true));
303
304 let filter_engine = super::filter::TransportFilterEngine::new(
305 &config.filters_in,
306 &config.filters_out,
307 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
308 )?;
309
310 #[cfg(feature = "health")]
311 {
312 let h = Arc::clone(&healthy);
313 crate::health::HealthRegistry::register("transport:grpc", move || {
314 if h.load(Ordering::Relaxed) {
315 crate::health::HealthStatus::Healthy
316 } else {
317 crate::health::HealthStatus::Unhealthy
318 }
319 });
320 }
321
322 Ok(Self {
323 client,
324 receiver,
325 shutdown_tx: parking_lot::Mutex::new(shutdown_tx),
326 _server_handle: server_handle,
327 closed: AtomicBool::new(false),
328 healthy,
329 recv_timeout_ms: config.recv_timeout_ms,
330 send_timeout_ms: config.send_timeout_ms,
331 max_message_size: config.max_message_size,
332 #[cfg(feature = "metrics")]
333 inflight: AtomicU64::new(0),
334 filter_engine,
335 })
336 }
337}
338
339impl TransportSender for GrpcTransport {
340 async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
341 if self.closed.load(Ordering::Relaxed) {
342 return SendResult::Fatal(TransportError::Closed);
343 }
344
345 if self.filter_engine.has_outbound_filters() {
347 match self.filter_engine.apply_outbound(&payload) {
348 super::filter::FilterDisposition::Pass => {}
349 super::filter::FilterDisposition::Drop => return SendResult::Ok,
350 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
351 }
352 }
353
354 let Some(client) = &self.client else {
355 return SendResult::Fatal(TransportError::Config(
356 "no endpoint configured for sending".into(),
357 ));
358 };
359
360 let mut metadata = HashMap::new();
361 if !key.is_empty() {
362 metadata.insert("topic".to_string(), key.to_string());
363 }
364
365 #[cfg(feature = "transport-trace")]
367 if let Some(tp) = super::propagation::current_traceparent() {
368 metadata.insert(super::propagation::TRACEPARENT_HEADER.to_string(), tp);
369 }
370
371 let mut request = tonic::Request::new(proto::PushRequest {
372 payload,
374 format: proto::Format::Auto.into(),
375 metadata,
376 });
377
378 if self.send_timeout_ms > 0 {
381 request.set_timeout(std::time::Duration::from_millis(self.send_timeout_ms));
382 }
383
384 #[cfg(feature = "metrics")]
385 let start = std::time::Instant::now();
386
387 #[cfg(feature = "metrics")]
388 self.inflight.fetch_add(1, Ordering::Relaxed);
389
390 let result = match client.clone().push(request).await {
392 Ok(_) => {
393 #[cfg(feature = "metrics")]
394 metrics::counter!("dfe_transport_sent_total", "transport" => "grpc").increment(1);
395 SendResult::Ok
396 }
397 Err(status) => match status.code() {
398 tonic::Code::Unavailable
401 | tonic::Code::ResourceExhausted
402 | tonic::Code::DeadlineExceeded => {
403 #[cfg(feature = "metrics")]
404 metrics::counter!(
405 "dfe_transport_backpressured_total",
406 "transport" => "grpc"
407 )
408 .increment(1);
409 SendResult::Backpressured
410 }
411 _ => {
412 #[cfg(feature = "metrics")]
413 metrics::counter!(
414 "dfe_transport_send_errors_total",
415 "transport" => "grpc"
416 )
417 .increment(1);
418 SendResult::Fatal(TransportError::Send(status.message().to_string()))
419 }
420 },
421 };
422
423 #[cfg(feature = "metrics")]
424 {
425 self.inflight.fetch_sub(1, Ordering::Relaxed);
426 metrics::gauge!("dfe_transport_inflight", "transport" => "grpc")
427 .set(self.inflight.load(Ordering::Relaxed) as f64);
428 metrics::histogram!(
429 "dfe_transport_send_duration_seconds",
430 "transport" => "grpc"
431 )
432 .record(start.elapsed().as_secs_f64());
433 }
434
435 result
436 }
437
438 async fn send_batch(&self, records: &[Record]) -> SendResult {
461 if self.closed.load(Ordering::Relaxed) {
462 return SendResult::Fatal(TransportError::Closed);
463 }
464
465 let Some(client) = &self.client else {
466 return SendResult::Fatal(TransportError::Config(
467 "no endpoint configured for sending".into(),
468 ));
469 };
470
471 let to_send: Vec<Record> = if self.filter_engine.has_outbound_filters() {
475 let mut keep = Vec::with_capacity(records.len());
476 for r in records {
477 match self.filter_engine.apply_outbound(&r.payload) {
478 super::filter::FilterDisposition::Pass => keep.push(r.clone()),
479 super::filter::FilterDisposition::Drop
480 | super::filter::FilterDisposition::Dlq => {}
481 }
482 }
483 keep
484 } else {
485 records.to_vec()
486 };
487
488 if to_send.is_empty() {
490 return SendResult::Ok;
491 }
492 let sent_count = to_send.len();
493
494 let payload_bytes: usize = to_send.iter().map(|r| r.payload.len()).sum();
500 if payload_bytes > self.max_message_size {
501 #[cfg(feature = "metrics")]
502 metrics::counter!("dfe_transport_oversize_total", "transport" => "grpc").increment(1);
503 return SendResult::Fatal(TransportError::Config(format!(
504 "gRPC batch payload {payload_bytes} bytes exceeds max_message_size \
505 {} -- lower the self-regulation byte budget below the gRPC limit",
506 self.max_message_size
507 )));
508 }
509
510 let proto_batch = batch::records_to_proto(to_send);
512
513 let mut request = tonic::Request::new(proto_batch);
514
515 #[cfg(feature = "transport-trace")]
517 if let Some(tp) = super::propagation::current_traceparent()
518 && let Ok(val) = tp.parse()
519 {
520 request
521 .metadata_mut()
522 .insert(super::propagation::TRACEPARENT_HEADER, val);
523 }
524
525 if self.send_timeout_ms > 0 {
526 request.set_timeout(std::time::Duration::from_millis(self.send_timeout_ms));
527 }
528
529 #[cfg(feature = "metrics")]
530 let start = std::time::Instant::now();
531 #[cfg(feature = "metrics")]
532 self.inflight.fetch_add(1, Ordering::Relaxed);
533
534 let result = match client.clone().route_batch(request).await {
535 Ok(response) => {
536 let accepted = response.into_inner().accepted;
542 if accepted < sent_count as u64 {
543 #[cfg(feature = "metrics")]
544 metrics::counter!(
545 "dfe_transport_backpressured_total",
546 "transport" => "grpc"
547 )
548 .increment(1);
549 tracing::warn!(
550 accepted,
551 sent = sent_count,
552 "gRPC RouteBatch partially accepted -- retrying whole block"
553 );
554 SendResult::Backpressured
555 } else {
556 #[cfg(feature = "metrics")]
557 metrics::counter!(
558 "dfe_transport_sent_total",
559 "transport" => "grpc",
560 "path" => "batch"
561 )
562 .increment(sent_count as u64);
563 SendResult::Ok
564 }
565 }
566 Err(status) => match status.code() {
567 tonic::Code::Unavailable
568 | tonic::Code::ResourceExhausted
569 | tonic::Code::DeadlineExceeded => {
570 #[cfg(feature = "metrics")]
571 metrics::counter!(
572 "dfe_transport_backpressured_total",
573 "transport" => "grpc"
574 )
575 .increment(1);
576 SendResult::Backpressured
577 }
578 _ => {
579 #[cfg(feature = "metrics")]
580 metrics::counter!(
581 "dfe_transport_send_errors_total",
582 "transport" => "grpc"
583 )
584 .increment(1);
585 SendResult::Fatal(TransportError::Send(status.message().to_string()))
586 }
587 },
588 };
589
590 #[cfg(feature = "metrics")]
591 {
592 self.inflight.fetch_sub(1, Ordering::Relaxed);
593 metrics::histogram!(
594 "dfe_transport_send_duration_seconds",
595 "transport" => "grpc"
596 )
597 .record(start.elapsed().as_secs_f64());
598 }
599
600 result
601 }
602}
603
604impl TransportBase for GrpcTransport {
605 async fn close(&self) -> TransportResult<()> {
606 self.closed.store(true, Ordering::Relaxed);
607 self.healthy.store(false, Ordering::Relaxed);
608
609 if let Some(tx) = self.shutdown_tx.lock().take() {
613 let _ = tx.send(());
614 }
615 Ok(())
616 }
617
618 fn is_healthy(&self) -> bool {
619 let healthy = self.healthy.load(Ordering::Relaxed);
620 #[cfg(feature = "metrics")]
621 metrics::gauge!("dfe_transport_healthy", "transport" => "grpc").set(if healthy {
622 1.0
623 } else {
624 0.0
625 });
626 healthy
627 }
628
629 fn name(&self) -> &'static str {
630 "grpc"
631 }
632}
633
634impl TransportReceiver for GrpcTransport {
635 type Token = GrpcToken;
636
637 async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
638 if self.closed.load(Ordering::Relaxed) {
639 return Err(TransportError::Closed);
640 }
641
642 let Some(receiver) = &self.receiver else {
643 return Err(TransportError::Config(
644 "no listen address configured for receiving".into(),
645 ));
646 };
647
648 let mut rx = receiver.lock().await;
649 let mut messages = Vec::with_capacity(max.min(100));
650
651 for _ in 0..max {
652 let result = if self.recv_timeout_ms == 0 {
653 match rx.try_recv() {
655 Ok(msg) => Some(msg),
656 Err(mpsc::error::TryRecvError::Empty) => break,
657 Err(mpsc::error::TryRecvError::Disconnected) => {
658 return Err(TransportError::Closed);
659 }
660 }
661 } else if messages.is_empty() {
662 match tokio::time::timeout(
664 std::time::Duration::from_millis(self.recv_timeout_ms),
665 rx.recv(),
666 )
667 .await
668 {
669 Ok(Some(msg)) => Some(msg),
670 Ok(None) => return Err(TransportError::Closed),
671 Err(_) => break, }
673 } else {
674 match rx.try_recv() {
676 Ok(msg) => Some(msg),
677 Err(_) => break,
678 }
679 };
680
681 if let Some(msg) = result {
682 messages.push(msg);
683 }
684 }
685
686 let batch = self.filter_engine.partition_batch(
689 messages,
690 |m| m.payload.as_ref(),
691 |m| m.key.clone(),
692 |m| m.token.clone(),
693 );
694 let messages = batch.messages;
695 let dlq_entries = batch.dlq_entries;
696 let filtered_tokens = batch.filtered_tokens;
697
698 Ok(RecvBatch {
699 messages,
700 dlq_entries,
701 filtered_tokens,
702 }
703 .into())
704 }
705
706 async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
707 Ok(())
710 }
711}
712
713impl Drop for GrpcTransport {
714 fn drop(&mut self) {
715 if let Some(tx) = self.shutdown_tx.lock().take() {
717 let _ = tx.send(());
718 }
719 }
721}
722
723#[cfg(feature = "metrics")]
729struct InflightGuard(Arc<AtomicU64>);
730
731#[cfg(feature = "metrics")]
732impl InflightGuard {
733 fn enter(counter: &Arc<AtomicU64>) -> Self {
734 let n = counter.fetch_add(1, Ordering::Relaxed) + 1;
735 metrics::gauge!("dfe_grpc_server_inflight_requests").set(n as f64);
736 Self(Arc::clone(counter))
737 }
738}
739
740#[cfg(feature = "metrics")]
741impl Drop for InflightGuard {
742 fn drop(&mut self) {
743 let n = self.0.fetch_sub(1, Ordering::Relaxed).saturating_sub(1);
744 metrics::gauge!("dfe_grpc_server_inflight_requests").set(n as f64);
745 }
746}
747
748struct DfeTransportServiceImpl {
751 sender: mpsc::Sender<Message<GrpcToken>>,
752 sequence: Arc<AtomicU64>,
753 #[cfg(feature = "metrics")]
758 server_inflight: Arc<AtomicU64>,
759 #[cfg(feature = "governor")]
764 pressure: Option<Arc<crate::governor::UnifiedPressure>>,
765}
766
767#[tonic::async_trait]
768impl proto::dfe_transport_server::DfeTransport for DfeTransportServiceImpl {
769 async fn push(
770 &self,
771 request: Request<proto::PushRequest>,
772 ) -> Result<Response<proto::PushResponse>, Status> {
773 #[cfg(feature = "metrics")]
775 let _inflight = InflightGuard::enter(&self.server_inflight);
776
777 #[cfg(feature = "governor")]
780 if let Some(pressure) = &self.pressure
781 && pressure.should_hold()
782 {
783 #[cfg(feature = "metrics")]
784 {
785 metrics::counter!(
786 "dfe_transport_backpressured_total",
787 "transport" => "grpc",
788 "reason" => "pressure"
789 )
790 .increment(1);
791 metrics::counter!("dfe_grpc_server_shed_total").increment(1);
793 }
794 return Err(Status::unavailable("under pressure -- inbound held"));
795 }
796
797 let req = request.into_inner();
798 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
799
800 #[cfg(feature = "transport-trace")]
802 if let Some(tp) = req.metadata.get(super::propagation::TRACEPARENT_HEADER)
803 && super::propagation::is_valid_traceparent(tp)
804 {
805 tracing::Span::current().record("traceparent", tp.as_str());
806 }
807
808 let format = PayloadFormat::detect(&req.payload);
809 let key = req.metadata.get("topic").map(|s| Arc::from(s.as_str()));
810
811 let msg = Message {
814 key,
815 payload: req.payload,
816 token: GrpcToken::new(seq),
817 timestamp_ms: None,
818 format,
819 };
820
821 match self.sender.try_send(msg) {
822 Ok(()) => {
823 #[cfg(feature = "metrics")]
824 {
825 metrics::counter!("dfe_transport_sent_total", "transport" => "grpc")
826 .increment(1);
827 metrics::counter!("dfe_transport_received_total", "transport" => "grpc")
830 .increment(1);
831 metrics::gauge!("dfe_transport_queue_size", "transport" => "grpc").set(
832 self.sender
833 .max_capacity()
834 .saturating_sub(self.sender.capacity()) as f64,
835 );
836 }
837 Ok(Response::new(proto::PushResponse { accepted: 1 }))
838 }
839 Err(mpsc::error::TrySendError::Full(_)) => {
840 #[cfg(feature = "metrics")]
841 {
842 metrics::counter!(
843 "dfe_transport_backpressured_total",
844 "transport" => "grpc"
845 )
846 .increment(1);
847 metrics::counter!("dfe_grpc_server_shed_total").increment(1);
848 }
849 Err(Status::resource_exhausted("receiver buffer full"))
850 }
851 Err(mpsc::error::TrySendError::Closed(_)) => {
852 #[cfg(feature = "metrics")]
853 {
854 metrics::counter!(
855 "dfe_transport_refused_total",
856 "transport" => "grpc"
857 )
858 .increment(1);
859 metrics::counter!("dfe_grpc_server_shed_total").increment(1);
860 }
861 Err(Status::unavailable("receiver closed"))
862 }
863 }
864 }
865
866 async fn route_batch(
867 &self,
868 request: Request<proto::Batch>,
869 ) -> Result<Response<proto::BatchAck>, Status> {
870 #[cfg(feature = "metrics")]
872 let _inflight = InflightGuard::enter(&self.server_inflight);
873
874 #[cfg(feature = "governor")]
876 if let Some(pressure) = &self.pressure
877 && pressure.should_hold()
878 {
879 #[cfg(feature = "metrics")]
880 {
881 metrics::counter!(
882 "dfe_transport_backpressured_total",
883 "transport" => "grpc",
884 "reason" => "pressure"
885 )
886 .increment(1);
887 metrics::counter!("dfe_grpc_server_shed_total").increment(1);
888 }
889 return Err(Status::unavailable("under pressure -- inbound held"));
890 }
891
892 #[cfg(feature = "transport-trace")]
894 if let Some(tp) = request
895 .metadata()
896 .get(super::propagation::TRACEPARENT_HEADER)
897 .and_then(|v| v.to_str().ok())
898 && super::propagation::is_valid_traceparent(tp)
899 {
900 tracing::Span::current().record("traceparent", tp);
901 }
902
903 let proto_batch = request.into_inner();
904
905 let records = batch::proto_batch_to_records(proto_batch);
909 let accepted = records.len() as u64;
910
911 let permits = match self.sender.try_reserve_many(records.len()) {
918 Ok(permits) => permits,
919 Err(mpsc::error::TrySendError::Full(())) => {
920 #[cfg(feature = "metrics")]
921 {
922 metrics::counter!(
923 "dfe_transport_backpressured_total",
924 "transport" => "grpc"
925 )
926 .increment(1);
927 metrics::counter!("dfe_grpc_server_shed_total").increment(1);
928 }
929 return Err(Status::resource_exhausted("receiver buffer full"));
930 }
931 Err(mpsc::error::TrySendError::Closed(())) => {
932 #[cfg(feature = "metrics")]
933 {
934 metrics::counter!(
935 "dfe_transport_refused_total",
936 "transport" => "grpc"
937 )
938 .increment(1);
939 metrics::counter!("dfe_grpc_server_shed_total").increment(1);
940 }
941 return Err(Status::unavailable("receiver closed"));
942 }
943 };
944
945 for (permit, record) in permits.zip(records) {
947 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
948 let format = record.metadata.format;
949 let format = if format == PayloadFormat::Auto {
952 PayloadFormat::detect(&record.payload)
953 } else {
954 format
955 };
956
957 permit.send(Message {
958 key: record.key,
959 payload: record.payload,
960 token: GrpcToken::new(seq),
961 timestamp_ms: record.metadata.timestamp_ms,
962 format,
963 });
964 }
965
966 #[cfg(feature = "metrics")]
967 {
968 metrics::counter!(
969 "dfe_transport_sent_total",
970 "transport" => "grpc",
971 "path" => "batch"
972 )
973 .increment(accepted);
974 metrics::counter!("dfe_transport_received_total", "transport" => "grpc")
976 .increment(accepted);
977 }
978
979 Ok(Response::new(proto::BatchAck { accepted }))
980 }
981
982 async fn health_check(
983 &self,
984 _request: Request<proto::HealthCheckRequest>,
985 ) -> Result<Response<proto::HealthCheckResponse>, Status> {
986 Ok(Response::new(proto::HealthCheckResponse {
987 status: proto::ServingStatus::Serving.into(),
988 }))
989 }
990}
991
992#[cfg(test)]
993mod tests {
994 use super::*;
995
996 #[test]
997 fn grpc_token_display() {
998 let token = GrpcToken::new(42);
999 assert_eq!(format!("{token}"), "grpc:42");
1000
1001 let token = GrpcToken::with_source(7, Arc::from("peer-1"));
1002 assert_eq!(format!("{token}"), "grpc:peer-1:7");
1003 }
1004
1005 #[test]
1006 fn grpc_config_defaults() {
1007 let config = GrpcConfig::default();
1008 assert!(config.listen.is_none());
1009 assert!(config.endpoint.is_none());
1010 assert_eq!(config.recv_buffer_size, 10_000);
1011 assert_eq!(config.recv_timeout_ms, 100);
1012 assert_eq!(config.send_timeout_ms, 30_000);
1013 assert_eq!(config.max_message_size, 16 * 1024 * 1024);
1014 assert!(!config.compression);
1015 assert!(!config.tls_enabled);
1016 assert!(config.tls_ca_path.is_none());
1017 }
1018
1019 #[test]
1020 fn grpc_client_tls_builds_with_private_ca_and_rejects_half_mtls() {
1021 use std::io::Write;
1022 let cert = rcgen::generate_simple_self_signed(vec!["grpc.test".to_string()]).unwrap();
1023 let mut ca = tempfile::NamedTempFile::new().unwrap();
1024 ca.write_all(cert.cert.pem().as_bytes()).unwrap();
1025 ca.flush().unwrap();
1026
1027 let cfg = GrpcConfig {
1029 endpoint: Some("https://peer:6000".to_string()),
1030 tls_enabled: true,
1031 tls_ca_path: Some(ca.path().to_string_lossy().into_owned()),
1032 tls_domain: Some("grpc.test".to_string()),
1033 ..Default::default()
1034 };
1035 assert!(build_grpc_client_tls(&cfg).is_ok());
1036
1037 let cfg = GrpcConfig {
1039 tls_enabled: true,
1040 tls_client_cert_path: Some(ca.path().to_string_lossy().into_owned()),
1041 tls_client_key_path: None,
1042 ..Default::default()
1043 };
1044 assert!(build_grpc_client_tls(&cfg).is_err());
1045 }
1046
1047 #[test]
1048 fn grpc_config_server() {
1049 let config = GrpcConfig::server("0.0.0.0:6000");
1050 assert_eq!(config.listen.as_deref(), Some("0.0.0.0:6000"));
1051 assert!(config.endpoint.is_none());
1052 }
1053
1054 #[test]
1055 fn grpc_config_client() {
1056 let config = GrpcConfig::client("http://loader:6000");
1057 assert!(config.listen.is_none());
1058 assert_eq!(config.endpoint.as_deref(), Some("http://loader:6000"));
1059 }
1060
1061 #[tokio::test]
1062 async fn send_batch_rejects_oversize_block() {
1063 let config = GrpcConfig::client("http://127.0.0.1:1").with_max_message_size(64);
1068 let transport = GrpcTransport::new(&config).await.unwrap();
1069 let rec = Record {
1070 payload: bytes::Bytes::from(vec![b'x'; 256]),
1071 key: None,
1072 headers: Vec::new(),
1073 metadata: crate::transport::work_batch::RecordMeta {
1074 timestamp_ms: None,
1075 format: PayloadFormat::Json,
1076 },
1077 };
1078 match transport.send_batch(&[rec]).await {
1079 SendResult::Fatal(e) => assert!(
1080 e.to_string().contains("max_message_size"),
1081 "error should name the limit, got: {e}"
1082 ),
1083 other => panic!("expected Fatal for oversize block, got {other:?}"),
1084 }
1085 }
1086
1087 #[test]
1088 fn grpc_config_with_compression() {
1089 let config = GrpcConfig::server("0.0.0.0:6000").with_compression();
1090 assert!(config.compression);
1091 }
1092
1093 #[tokio::test]
1094 async fn grpc_transport_client_only() {
1095 let config = GrpcConfig::client("http://localhost:16000");
1097 let transport = GrpcTransport::new(&config).await.unwrap();
1098
1099 assert!(transport.client.is_some());
1100 assert!(transport.receiver.is_none());
1101 assert!(transport.is_healthy());
1102 assert_eq!(transport.name(), "grpc");
1103
1104 let result = transport.recv(10).await;
1106 assert!(result.is_err());
1107
1108 transport.commit(&[]).await.unwrap();
1110 }
1111
1112 #[cfg(feature = "governor")]
1116 #[tokio::test]
1117 async fn grpc_pressure_high_rejects_unavailable() {
1118 use crate::governor::{Hysteresis, MemoryPressureSource, PressureSource, UnifiedPressure};
1119 use crate::memory::{MemoryGuard, MemoryGuardConfig};
1120
1121 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1122 limit_bytes: 1000,
1123 pressure_threshold: 0.80,
1124 ..Default::default()
1125 }));
1126 guard.add_bytes(950); let pressure = Arc::new(UnifiedPressure::new(
1128 vec![Arc::new(MemoryPressureSource::new(Arc::clone(&guard))) as Arc<dyn PressureSource>],
1129 Hysteresis::new(0.80, 0.65).expect("valid band"),
1130 ));
1131 assert!(pressure.should_hold(), "pinned-high governor must hold");
1132
1133 let server_cfg = GrpcConfig::server("127.0.0.1:16077");
1135 let server = GrpcTransport::with_pressure(&server_cfg, Some(Arc::clone(&pressure)))
1136 .await
1137 .unwrap();
1138 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1139
1140 let client_cfg = GrpcConfig::client("http://127.0.0.1:16077");
1142 let client = GrpcTransport::new(&client_cfg).await.unwrap();
1143 let result = client
1144 .send("events", bytes::Bytes::from_static(b"{\"x\":1}"))
1145 .await;
1146 assert!(
1147 matches!(result, SendResult::Backpressured),
1148 "push under pressure must surface as backpressure, got {result:?}"
1149 );
1150
1151 client.close().await.unwrap();
1152 server.close().await.unwrap();
1153 }
1154
1155 #[tokio::test]
1156 async fn grpc_transport_server_only() {
1157 let config = GrpcConfig::server("127.0.0.1:16001");
1160 let transport = GrpcTransport::new(&config).await.unwrap();
1161
1162 assert!(transport.client.is_none());
1163 assert!(transport.receiver.is_some());
1164 assert!(transport.is_healthy());
1165
1166 let result = transport
1168 .send("test", bytes::Bytes::from_static(b"payload"))
1169 .await;
1170 assert!(result.is_fatal());
1171
1172 transport.close().await.unwrap();
1174 assert!(!transport.is_healthy());
1175 }
1176}