1pub mod batch;
40pub mod config;
41pub mod proto;
42pub mod token;
43
44pub use config::GrpcConfig;
45pub use token::GrpcToken;
46
47use super::error::{TransportError, TransportResult};
48use super::traits::{RecvBatch, TransportBase, TransportReceiver, TransportSender};
49use super::types::{Message, PayloadFormat, SendResult};
50use super::work_batch::{Record, WorkBatch};
51use std::collections::HashMap;
52use std::sync::Arc;
53use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
54use tokio::sync::{mpsc, oneshot};
55use tonic::{Request, Response, Status};
56
57pub struct GrpcTransport {
62 client: Option<proto::dfe_transport_client::DfeTransportClient<tonic::transport::Channel>>,
64
65 receiver: Option<tokio::sync::Mutex<mpsc::Receiver<Message<GrpcToken>>>>,
67
68 shutdown_tx: parking_lot::Mutex<Option<oneshot::Sender<()>>>,
71
72 _server_handle: Option<tokio::task::JoinHandle<Result<(), tonic::transport::Error>>>,
74
75 closed: AtomicBool,
77
78 healthy: Arc<AtomicBool>,
80
81 recv_timeout_ms: u64,
83
84 send_timeout_ms: u64,
86
87 max_message_size: usize,
91
92 #[cfg(feature = "metrics")]
94 inflight: AtomicU64,
95
96 filter_engine: super::filter::TransportFilterEngine,
98}
99
100fn build_grpc_client_tls(
106 config: &GrpcConfig,
107) -> TransportResult<tonic::transport::ClientTlsConfig> {
108 use tonic::transport::{Certificate, ClientTlsConfig, Identity};
109
110 let mut tls = ClientTlsConfig::new();
111
112 if let Some(ref ca) = config.tls_ca_path {
113 let pem = std::fs::read(ca)
114 .map_err(|e| TransportError::Config(format!("gRPC TLS: cannot read ca {ca}: {e}")))?;
115 tls = tls.ca_certificate(Certificate::from_pem(pem));
116 } else {
117 tls = tls.with_native_roots();
119 }
120
121 if let Some(ref domain) = config.tls_domain {
122 tls = tls.domain_name(domain.clone());
123 }
124
125 match (&config.tls_client_cert_path, &config.tls_client_key_path) {
127 (Some(cert), Some(key)) => {
128 let cert_pem = std::fs::read(cert).map_err(|e| {
129 TransportError::Config(format!("gRPC TLS: cannot read client cert {cert}: {e}"))
130 })?;
131 let key_pem = std::fs::read(key).map_err(|e| {
132 TransportError::Config(format!("gRPC TLS: cannot read client key {key}: {e}"))
133 })?;
134 tls = tls.identity(Identity::from_pem(cert_pem, key_pem));
135 }
136 (None, None) => {}
137 _ => {
138 return Err(TransportError::Config(
139 "gRPC TLS: mTLS requires BOTH tls_client_cert_path and tls_client_key_path"
140 .to_string(),
141 ));
142 }
143 }
144
145 Ok(tls)
146}
147
148impl GrpcTransport {
149 pub async fn new(config: &GrpcConfig) -> TransportResult<Self> {
161 Self::new_inner(
162 config,
163 #[cfg(feature = "governor")]
164 None,
165 )
166 .await
167 }
168
169 #[cfg(feature = "governor")]
182 pub async fn with_pressure(
183 config: &GrpcConfig,
184 pressure: Option<Arc<crate::governor::UnifiedPressure>>,
185 ) -> TransportResult<Self> {
186 Self::new_inner(config, pressure).await
187 }
188
189 async fn new_inner(
190 config: &GrpcConfig,
191 #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
192 ) -> TransportResult<Self> {
193 let mut client = None;
194 let mut receiver = None;
195 let mut shutdown_tx = None;
196 let mut server_handle = None;
197 let sequence = Arc::new(AtomicU64::new(0));
198
199 if let Some(endpoint) = &config.endpoint {
201 let mut ep = tonic::transport::Channel::from_shared(endpoint.clone())
202 .map_err(|e| TransportError::Config(format!("invalid endpoint: {e}")))?;
203
204 if config.tls_enabled {
207 ep = ep
208 .tls_config(build_grpc_client_tls(config)?)
209 .map_err(|e| TransportError::Config(format!("gRPC TLS config: {e}")))?;
210 }
211
212 let channel = ep.connect_lazy();
213
214 let mut c = proto::dfe_transport_client::DfeTransportClient::new(channel)
215 .max_decoding_message_size(config.max_message_size)
216 .max_encoding_message_size(config.max_message_size);
217
218 if config.compression {
219 c = c
220 .send_compressed(tonic::codec::CompressionEncoding::Gzip)
221 .accept_compressed(tonic::codec::CompressionEncoding::Gzip);
222 }
223
224 client = Some(c);
225 }
226
227 if let Some(listen) = &config.listen {
229 let addr: std::net::SocketAddr = listen
230 .parse()
231 .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
232
233 let (tx, rx) = mpsc::channel(config.recv_buffer_size);
234 let (sd_tx, sd_rx) = oneshot::channel();
235
236 let dfe_svc = DfeTransportServiceImpl {
238 sender: tx.clone(),
239 sequence: sequence.clone(),
240 #[cfg(feature = "governor")]
241 pressure: pressure.clone(),
242 };
243
244 let dfe_server = proto::dfe_transport_server::DfeTransportServer::new(dfe_svc)
245 .max_decoding_message_size(config.max_message_size)
246 .max_encoding_message_size(config.max_message_size)
247 .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
248 .send_compressed(tonic::codec::CompressionEncoding::Gzip);
249
250 let mut builder = tonic::transport::Server::builder();
252
253 #[cfg(feature = "transport-grpc-vector-compat")]
254 let router = if config.vector_compat {
255 let vector_svc =
256 super::vector_compat::source::VectorCompatService::new(tx, sequence.clone());
257 let vector_server =
258 super::vector_compat::proto::vector::vector_server::VectorServer::new(
259 vector_svc,
260 )
261 .max_decoding_message_size(config.max_message_size)
262 .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
263 .send_compressed(tonic::codec::CompressionEncoding::Gzip);
264
265 builder.add_service(dfe_server).add_service(vector_server)
266 } else {
267 builder.add_service(dfe_server)
268 };
269
270 #[cfg(not(feature = "transport-grpc-vector-compat"))]
271 let router = builder.add_service(dfe_server);
272
273 let listener = tokio::net::TcpListener::bind(addr)
278 .await
279 .map_err(|e| TransportError::Config(format!("failed to bind {addr}: {e}")))?;
280 let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
281
282 let handle = tokio::spawn(async move {
283 router
284 .serve_with_incoming_shutdown(incoming, async {
285 sd_rx.await.ok();
286 })
287 .await
288 });
289
290 receiver = Some(tokio::sync::Mutex::new(rx));
291 shutdown_tx = Some(sd_tx);
292 server_handle = Some(handle);
293 } else {
294 #[cfg(feature = "governor")]
297 let _ = pressure;
298 }
299
300 let healthy = Arc::new(AtomicBool::new(true));
301
302 let filter_engine = super::filter::TransportFilterEngine::new(
303 &config.filters_in,
304 &config.filters_out,
305 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
306 )?;
307
308 #[cfg(feature = "health")]
309 {
310 let h = Arc::clone(&healthy);
311 crate::health::HealthRegistry::register("transport:grpc", move || {
312 if h.load(Ordering::Relaxed) {
313 crate::health::HealthStatus::Healthy
314 } else {
315 crate::health::HealthStatus::Unhealthy
316 }
317 });
318 }
319
320 Ok(Self {
321 client,
322 receiver,
323 shutdown_tx: parking_lot::Mutex::new(shutdown_tx),
324 _server_handle: server_handle,
325 closed: AtomicBool::new(false),
326 healthy,
327 recv_timeout_ms: config.recv_timeout_ms,
328 send_timeout_ms: config.send_timeout_ms,
329 max_message_size: config.max_message_size,
330 #[cfg(feature = "metrics")]
331 inflight: AtomicU64::new(0),
332 filter_engine,
333 })
334 }
335}
336
337impl TransportSender for GrpcTransport {
338 async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
339 if self.closed.load(Ordering::Relaxed) {
340 return SendResult::Fatal(TransportError::Closed);
341 }
342
343 if self.filter_engine.has_outbound_filters() {
345 match self.filter_engine.apply_outbound(&payload) {
346 super::filter::FilterDisposition::Pass => {}
347 super::filter::FilterDisposition::Drop => return SendResult::Ok,
348 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
349 }
350 }
351
352 let Some(client) = &self.client else {
353 return SendResult::Fatal(TransportError::Config(
354 "no endpoint configured for sending".into(),
355 ));
356 };
357
358 let mut metadata = HashMap::new();
359 if !key.is_empty() {
360 metadata.insert("topic".to_string(), key.to_string());
361 }
362
363 #[cfg(feature = "transport-trace")]
365 if let Some(tp) = super::propagation::current_traceparent() {
366 metadata.insert(super::propagation::TRACEPARENT_HEADER.to_string(), tp);
367 }
368
369 let mut request = tonic::Request::new(proto::PushRequest {
370 payload,
372 format: proto::Format::Auto.into(),
373 metadata,
374 });
375
376 if self.send_timeout_ms > 0 {
379 request.set_timeout(std::time::Duration::from_millis(self.send_timeout_ms));
380 }
381
382 #[cfg(feature = "metrics")]
383 let start = std::time::Instant::now();
384
385 #[cfg(feature = "metrics")]
386 self.inflight.fetch_add(1, Ordering::Relaxed);
387
388 let result = match client.clone().push(request).await {
390 Ok(_) => {
391 #[cfg(feature = "metrics")]
392 metrics::counter!("dfe_transport_sent_total", "transport" => "grpc").increment(1);
393 SendResult::Ok
394 }
395 Err(status) => match status.code() {
396 tonic::Code::Unavailable
399 | tonic::Code::ResourceExhausted
400 | tonic::Code::DeadlineExceeded => {
401 #[cfg(feature = "metrics")]
402 metrics::counter!(
403 "dfe_transport_backpressured_total",
404 "transport" => "grpc"
405 )
406 .increment(1);
407 SendResult::Backpressured
408 }
409 _ => {
410 #[cfg(feature = "metrics")]
411 metrics::counter!(
412 "dfe_transport_send_errors_total",
413 "transport" => "grpc"
414 )
415 .increment(1);
416 SendResult::Fatal(TransportError::Send(status.message().to_string()))
417 }
418 },
419 };
420
421 #[cfg(feature = "metrics")]
422 {
423 self.inflight.fetch_sub(1, Ordering::Relaxed);
424 metrics::gauge!("dfe_transport_inflight", "transport" => "grpc")
425 .set(self.inflight.load(Ordering::Relaxed) as f64);
426 metrics::histogram!(
427 "dfe_transport_send_duration_seconds",
428 "transport" => "grpc"
429 )
430 .record(start.elapsed().as_secs_f64());
431 }
432
433 result
434 }
435
436 async fn send_batch(&self, records: &[Record]) -> SendResult {
459 if self.closed.load(Ordering::Relaxed) {
460 return SendResult::Fatal(TransportError::Closed);
461 }
462
463 let Some(client) = &self.client else {
464 return SendResult::Fatal(TransportError::Config(
465 "no endpoint configured for sending".into(),
466 ));
467 };
468
469 let to_send: Vec<Record> = if self.filter_engine.has_outbound_filters() {
473 let mut keep = Vec::with_capacity(records.len());
474 for r in records {
475 match self.filter_engine.apply_outbound(&r.payload) {
476 super::filter::FilterDisposition::Pass => keep.push(r.clone()),
477 super::filter::FilterDisposition::Drop
478 | super::filter::FilterDisposition::Dlq => {}
479 }
480 }
481 keep
482 } else {
483 records.to_vec()
484 };
485
486 if to_send.is_empty() {
488 return SendResult::Ok;
489 }
490 let sent_count = to_send.len();
491
492 let payload_bytes: usize = to_send.iter().map(|r| r.payload.len()).sum();
498 if payload_bytes > self.max_message_size {
499 #[cfg(feature = "metrics")]
500 metrics::counter!("dfe_transport_oversize_total", "transport" => "grpc").increment(1);
501 return SendResult::Fatal(TransportError::Config(format!(
502 "gRPC batch payload {payload_bytes} bytes exceeds max_message_size \
503 {} -- lower the self-regulation byte budget below the gRPC limit",
504 self.max_message_size
505 )));
506 }
507
508 let proto_batch = batch::records_to_proto(to_send);
510
511 let mut request = tonic::Request::new(proto_batch);
512
513 #[cfg(feature = "transport-trace")]
515 if let Some(tp) = super::propagation::current_traceparent()
516 && let Ok(val) = tp.parse()
517 {
518 request
519 .metadata_mut()
520 .insert(super::propagation::TRACEPARENT_HEADER, val);
521 }
522
523 if self.send_timeout_ms > 0 {
524 request.set_timeout(std::time::Duration::from_millis(self.send_timeout_ms));
525 }
526
527 #[cfg(feature = "metrics")]
528 let start = std::time::Instant::now();
529 #[cfg(feature = "metrics")]
530 self.inflight.fetch_add(1, Ordering::Relaxed);
531
532 let result = match client.clone().route_batch(request).await {
533 Ok(response) => {
534 let accepted = response.into_inner().accepted;
540 if accepted < sent_count as u64 {
541 #[cfg(feature = "metrics")]
542 metrics::counter!(
543 "dfe_transport_backpressured_total",
544 "transport" => "grpc"
545 )
546 .increment(1);
547 tracing::warn!(
548 accepted,
549 sent = sent_count,
550 "gRPC RouteBatch partially accepted -- retrying whole block"
551 );
552 SendResult::Backpressured
553 } else {
554 #[cfg(feature = "metrics")]
555 metrics::counter!(
556 "dfe_transport_sent_total",
557 "transport" => "grpc",
558 "path" => "batch"
559 )
560 .increment(sent_count as u64);
561 SendResult::Ok
562 }
563 }
564 Err(status) => match status.code() {
565 tonic::Code::Unavailable
566 | tonic::Code::ResourceExhausted
567 | tonic::Code::DeadlineExceeded => {
568 #[cfg(feature = "metrics")]
569 metrics::counter!(
570 "dfe_transport_backpressured_total",
571 "transport" => "grpc"
572 )
573 .increment(1);
574 SendResult::Backpressured
575 }
576 _ => {
577 #[cfg(feature = "metrics")]
578 metrics::counter!(
579 "dfe_transport_send_errors_total",
580 "transport" => "grpc"
581 )
582 .increment(1);
583 SendResult::Fatal(TransportError::Send(status.message().to_string()))
584 }
585 },
586 };
587
588 #[cfg(feature = "metrics")]
589 {
590 self.inflight.fetch_sub(1, Ordering::Relaxed);
591 metrics::histogram!(
592 "dfe_transport_send_duration_seconds",
593 "transport" => "grpc"
594 )
595 .record(start.elapsed().as_secs_f64());
596 }
597
598 result
599 }
600}
601
602impl TransportBase for GrpcTransport {
603 async fn close(&self) -> TransportResult<()> {
604 self.closed.store(true, Ordering::Relaxed);
605 self.healthy.store(false, Ordering::Relaxed);
606
607 if let Some(tx) = self.shutdown_tx.lock().take() {
611 let _ = tx.send(());
612 }
613 Ok(())
614 }
615
616 fn is_healthy(&self) -> bool {
617 let healthy = self.healthy.load(Ordering::Relaxed);
618 #[cfg(feature = "metrics")]
619 metrics::gauge!("dfe_transport_healthy", "transport" => "grpc").set(if healthy {
620 1.0
621 } else {
622 0.0
623 });
624 healthy
625 }
626
627 fn name(&self) -> &'static str {
628 "grpc"
629 }
630}
631
632impl TransportReceiver for GrpcTransport {
633 type Token = GrpcToken;
634
635 async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
636 if self.closed.load(Ordering::Relaxed) {
637 return Err(TransportError::Closed);
638 }
639
640 let Some(receiver) = &self.receiver else {
641 return Err(TransportError::Config(
642 "no listen address configured for receiving".into(),
643 ));
644 };
645
646 let mut rx = receiver.lock().await;
647 let mut messages = Vec::with_capacity(max.min(100));
648
649 for _ in 0..max {
650 let result = if self.recv_timeout_ms == 0 {
651 match rx.try_recv() {
653 Ok(msg) => Some(msg),
654 Err(mpsc::error::TryRecvError::Empty) => break,
655 Err(mpsc::error::TryRecvError::Disconnected) => {
656 return Err(TransportError::Closed);
657 }
658 }
659 } else if messages.is_empty() {
660 match tokio::time::timeout(
662 std::time::Duration::from_millis(self.recv_timeout_ms),
663 rx.recv(),
664 )
665 .await
666 {
667 Ok(Some(msg)) => Some(msg),
668 Ok(None) => return Err(TransportError::Closed),
669 Err(_) => break, }
671 } else {
672 match rx.try_recv() {
674 Ok(msg) => Some(msg),
675 Err(_) => break,
676 }
677 };
678
679 if let Some(msg) = result {
680 messages.push(msg);
681 }
682 }
683
684 let batch = self.filter_engine.partition_batch(
687 messages,
688 |m| m.payload.as_ref(),
689 |m| m.key.clone(),
690 |m| m.token.clone(),
691 );
692 let messages = batch.messages;
693 let dlq_entries = batch.dlq_entries;
694 let filtered_tokens = batch.filtered_tokens;
695
696 Ok(RecvBatch {
697 messages,
698 dlq_entries,
699 filtered_tokens,
700 }
701 .into())
702 }
703
704 async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
705 Ok(())
708 }
709}
710
711impl Drop for GrpcTransport {
712 fn drop(&mut self) {
713 if let Some(tx) = self.shutdown_tx.lock().take() {
715 let _ = tx.send(());
716 }
717 }
719}
720
721struct DfeTransportServiceImpl {
726 sender: mpsc::Sender<Message<GrpcToken>>,
727 sequence: Arc<AtomicU64>,
728 #[cfg(feature = "governor")]
733 pressure: Option<Arc<crate::governor::UnifiedPressure>>,
734}
735
736#[tonic::async_trait]
737impl proto::dfe_transport_server::DfeTransport for DfeTransportServiceImpl {
738 async fn push(
739 &self,
740 request: Request<proto::PushRequest>,
741 ) -> Result<Response<proto::PushResponse>, Status> {
742 #[cfg(feature = "governor")]
745 if let Some(pressure) = &self.pressure
746 && pressure.should_hold()
747 {
748 #[cfg(feature = "metrics")]
749 metrics::counter!(
750 "dfe_transport_backpressured_total",
751 "transport" => "grpc",
752 "reason" => "pressure"
753 )
754 .increment(1);
755 return Err(Status::unavailable("under pressure -- inbound held"));
756 }
757
758 let req = request.into_inner();
759 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
760
761 #[cfg(feature = "transport-trace")]
763 if let Some(tp) = req.metadata.get(super::propagation::TRACEPARENT_HEADER)
764 && super::propagation::is_valid_traceparent(tp)
765 {
766 tracing::Span::current().record("traceparent", tp.as_str());
767 }
768
769 let format = PayloadFormat::detect(&req.payload);
770 let key = req.metadata.get("topic").map(|s| Arc::from(s.as_str()));
771
772 let msg = Message {
775 key,
776 payload: req.payload,
777 token: GrpcToken::new(seq),
778 timestamp_ms: None,
779 format,
780 };
781
782 match self.sender.try_send(msg) {
783 Ok(()) => {
784 #[cfg(feature = "metrics")]
785 {
786 metrics::counter!("dfe_transport_sent_total", "transport" => "grpc")
787 .increment(1);
788 metrics::gauge!("dfe_transport_queue_size", "transport" => "grpc").set(
789 self.sender
790 .max_capacity()
791 .saturating_sub(self.sender.capacity()) as f64,
792 );
793 }
794 Ok(Response::new(proto::PushResponse { accepted: 1 }))
795 }
796 Err(mpsc::error::TrySendError::Full(_)) => {
797 #[cfg(feature = "metrics")]
798 metrics::counter!(
799 "dfe_transport_backpressured_total",
800 "transport" => "grpc"
801 )
802 .increment(1);
803 Err(Status::resource_exhausted("receiver buffer full"))
804 }
805 Err(mpsc::error::TrySendError::Closed(_)) => {
806 #[cfg(feature = "metrics")]
807 metrics::counter!(
808 "dfe_transport_refused_total",
809 "transport" => "grpc"
810 )
811 .increment(1);
812 Err(Status::unavailable("receiver closed"))
813 }
814 }
815 }
816
817 async fn route_batch(
818 &self,
819 request: Request<proto::Batch>,
820 ) -> Result<Response<proto::BatchAck>, Status> {
821 #[cfg(feature = "governor")]
823 if let Some(pressure) = &self.pressure
824 && pressure.should_hold()
825 {
826 #[cfg(feature = "metrics")]
827 metrics::counter!(
828 "dfe_transport_backpressured_total",
829 "transport" => "grpc",
830 "reason" => "pressure"
831 )
832 .increment(1);
833 return Err(Status::unavailable("under pressure -- inbound held"));
834 }
835
836 #[cfg(feature = "transport-trace")]
838 if let Some(tp) = request
839 .metadata()
840 .get(super::propagation::TRACEPARENT_HEADER)
841 .and_then(|v| v.to_str().ok())
842 && super::propagation::is_valid_traceparent(tp)
843 {
844 tracing::Span::current().record("traceparent", tp);
845 }
846
847 let proto_batch = request.into_inner();
848
849 let records = batch::proto_batch_to_records(proto_batch);
853 let accepted = records.len() as u64;
854
855 let permits = match self.sender.try_reserve_many(records.len()) {
862 Ok(permits) => permits,
863 Err(mpsc::error::TrySendError::Full(())) => {
864 #[cfg(feature = "metrics")]
865 metrics::counter!(
866 "dfe_transport_backpressured_total",
867 "transport" => "grpc"
868 )
869 .increment(1);
870 return Err(Status::resource_exhausted("receiver buffer full"));
871 }
872 Err(mpsc::error::TrySendError::Closed(())) => {
873 #[cfg(feature = "metrics")]
874 metrics::counter!(
875 "dfe_transport_refused_total",
876 "transport" => "grpc"
877 )
878 .increment(1);
879 return Err(Status::unavailable("receiver closed"));
880 }
881 };
882
883 for (permit, record) in permits.zip(records) {
885 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
886 let format = record.metadata.format;
887 let format = if format == PayloadFormat::Auto {
890 PayloadFormat::detect(&record.payload)
891 } else {
892 format
893 };
894
895 permit.send(Message {
896 key: record.key,
897 payload: record.payload,
898 token: GrpcToken::new(seq),
899 timestamp_ms: record.metadata.timestamp_ms,
900 format,
901 });
902 }
903
904 #[cfg(feature = "metrics")]
905 metrics::counter!(
906 "dfe_transport_sent_total",
907 "transport" => "grpc",
908 "path" => "batch"
909 )
910 .increment(accepted);
911
912 Ok(Response::new(proto::BatchAck { accepted }))
913 }
914
915 async fn health_check(
916 &self,
917 _request: Request<proto::HealthCheckRequest>,
918 ) -> Result<Response<proto::HealthCheckResponse>, Status> {
919 Ok(Response::new(proto::HealthCheckResponse {
920 status: proto::ServingStatus::Serving.into(),
921 }))
922 }
923}
924
925#[cfg(test)]
926mod tests {
927 use super::*;
928
929 #[test]
930 fn grpc_token_display() {
931 let token = GrpcToken::new(42);
932 assert_eq!(format!("{token}"), "grpc:42");
933
934 let token = GrpcToken::with_source(7, Arc::from("peer-1"));
935 assert_eq!(format!("{token}"), "grpc:peer-1:7");
936 }
937
938 #[test]
939 fn grpc_config_defaults() {
940 let config = GrpcConfig::default();
941 assert!(config.listen.is_none());
942 assert!(config.endpoint.is_none());
943 assert_eq!(config.recv_buffer_size, 10_000);
944 assert_eq!(config.recv_timeout_ms, 100);
945 assert_eq!(config.send_timeout_ms, 30_000);
946 assert_eq!(config.max_message_size, 16 * 1024 * 1024);
947 assert!(!config.compression);
948 assert!(!config.tls_enabled);
949 assert!(config.tls_ca_path.is_none());
950 }
951
952 #[test]
953 fn grpc_client_tls_builds_with_private_ca_and_rejects_half_mtls() {
954 use std::io::Write;
955 let cert = rcgen::generate_simple_self_signed(vec!["grpc.test".to_string()]).unwrap();
956 let mut ca = tempfile::NamedTempFile::new().unwrap();
957 ca.write_all(cert.cert.pem().as_bytes()).unwrap();
958 ca.flush().unwrap();
959
960 let cfg = GrpcConfig {
962 endpoint: Some("https://peer:6000".to_string()),
963 tls_enabled: true,
964 tls_ca_path: Some(ca.path().to_string_lossy().into_owned()),
965 tls_domain: Some("grpc.test".to_string()),
966 ..Default::default()
967 };
968 assert!(build_grpc_client_tls(&cfg).is_ok());
969
970 let cfg = GrpcConfig {
972 tls_enabled: true,
973 tls_client_cert_path: Some(ca.path().to_string_lossy().into_owned()),
974 tls_client_key_path: None,
975 ..Default::default()
976 };
977 assert!(build_grpc_client_tls(&cfg).is_err());
978 }
979
980 #[test]
981 fn grpc_config_server() {
982 let config = GrpcConfig::server("0.0.0.0:6000");
983 assert_eq!(config.listen.as_deref(), Some("0.0.0.0:6000"));
984 assert!(config.endpoint.is_none());
985 }
986
987 #[test]
988 fn grpc_config_client() {
989 let config = GrpcConfig::client("http://loader:6000");
990 assert!(config.listen.is_none());
991 assert_eq!(config.endpoint.as_deref(), Some("http://loader:6000"));
992 }
993
994 #[tokio::test]
995 async fn send_batch_rejects_oversize_block() {
996 let config = GrpcConfig::client("http://127.0.0.1:1").with_max_message_size(64);
1001 let transport = GrpcTransport::new(&config).await.unwrap();
1002 let rec = Record {
1003 payload: bytes::Bytes::from(vec![b'x'; 256]),
1004 key: None,
1005 headers: Vec::new(),
1006 metadata: crate::transport::work_batch::RecordMeta {
1007 timestamp_ms: None,
1008 format: PayloadFormat::Json,
1009 },
1010 };
1011 match transport.send_batch(&[rec]).await {
1012 SendResult::Fatal(e) => assert!(
1013 e.to_string().contains("max_message_size"),
1014 "error should name the limit, got: {e}"
1015 ),
1016 other => panic!("expected Fatal for oversize block, got {other:?}"),
1017 }
1018 }
1019
1020 #[test]
1021 fn grpc_config_with_compression() {
1022 let config = GrpcConfig::server("0.0.0.0:6000").with_compression();
1023 assert!(config.compression);
1024 }
1025
1026 #[tokio::test]
1027 async fn grpc_transport_client_only() {
1028 let config = GrpcConfig::client("http://localhost:16000");
1030 let transport = GrpcTransport::new(&config).await.unwrap();
1031
1032 assert!(transport.client.is_some());
1033 assert!(transport.receiver.is_none());
1034 assert!(transport.is_healthy());
1035 assert_eq!(transport.name(), "grpc");
1036
1037 let result = transport.recv(10).await;
1039 assert!(result.is_err());
1040
1041 transport.commit(&[]).await.unwrap();
1043 }
1044
1045 #[cfg(feature = "governor")]
1049 #[tokio::test]
1050 async fn grpc_pressure_high_rejects_unavailable() {
1051 use crate::governor::{Hysteresis, MemoryPressureSource, PressureSource, UnifiedPressure};
1052 use crate::memory::{MemoryGuard, MemoryGuardConfig};
1053
1054 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1055 limit_bytes: 1000,
1056 pressure_threshold: 0.80,
1057 ..Default::default()
1058 }));
1059 guard.add_bytes(950); let pressure = Arc::new(UnifiedPressure::new(
1061 vec![Arc::new(MemoryPressureSource::new(Arc::clone(&guard))) as Arc<dyn PressureSource>],
1062 Hysteresis::new(0.80, 0.65).expect("valid band"),
1063 ));
1064 assert!(pressure.should_hold(), "pinned-high governor must hold");
1065
1066 let server_cfg = GrpcConfig::server("127.0.0.1:16077");
1068 let server = GrpcTransport::with_pressure(&server_cfg, Some(Arc::clone(&pressure)))
1069 .await
1070 .unwrap();
1071 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1072
1073 let client_cfg = GrpcConfig::client("http://127.0.0.1:16077");
1075 let client = GrpcTransport::new(&client_cfg).await.unwrap();
1076 let result = client
1077 .send("events", bytes::Bytes::from_static(b"{\"x\":1}"))
1078 .await;
1079 assert!(
1080 matches!(result, SendResult::Backpressured),
1081 "push under pressure must surface as backpressure, got {result:?}"
1082 );
1083
1084 client.close().await.unwrap();
1085 server.close().await.unwrap();
1086 }
1087
1088 #[tokio::test]
1089 async fn grpc_transport_server_only() {
1090 let config = GrpcConfig::server("127.0.0.1:16001");
1093 let transport = GrpcTransport::new(&config).await.unwrap();
1094
1095 assert!(transport.client.is_none());
1096 assert!(transport.receiver.is_some());
1097 assert!(transport.is_healthy());
1098
1099 let result = transport
1101 .send("test", bytes::Bytes::from_static(b"payload"))
1102 .await;
1103 assert!(result.is_fatal());
1104
1105 transport.close().await.unwrap();
1107 assert!(!transport.is_healthy());
1108 }
1109}