hyperi_rustlib/transport/grpc/
mod.rs1pub mod config;
40pub mod proto;
41pub mod token;
42
43pub use config::GrpcConfig;
44pub use token::GrpcToken;
45
46use super::error::{TransportError, TransportResult};
47use super::traits::{RecvBatch, TransportBase, TransportReceiver, TransportSender};
48use super::types::{Message, PayloadFormat, SendResult};
49use std::collections::HashMap;
50use std::sync::Arc;
51use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
52use tokio::sync::{mpsc, oneshot};
53use tonic::{Request, Response, Status};
54
55pub struct GrpcTransport {
60 client: Option<proto::dfe_transport_client::DfeTransportClient<tonic::transport::Channel>>,
62
63 receiver: Option<tokio::sync::Mutex<mpsc::Receiver<Message<GrpcToken>>>>,
65
66 shutdown_tx: parking_lot::Mutex<Option<oneshot::Sender<()>>>,
69
70 _server_handle: Option<tokio::task::JoinHandle<Result<(), tonic::transport::Error>>>,
72
73 closed: AtomicBool,
75
76 healthy: Arc<AtomicBool>,
78
79 recv_timeout_ms: u64,
81
82 send_timeout_ms: u64,
84
85 #[cfg(feature = "metrics")]
87 inflight: AtomicU64,
88
89 filter_engine: super::filter::TransportFilterEngine,
91}
92
93fn build_grpc_client_tls(
99 config: &GrpcConfig,
100) -> TransportResult<tonic::transport::ClientTlsConfig> {
101 use tonic::transport::{Certificate, ClientTlsConfig, Identity};
102
103 let mut tls = ClientTlsConfig::new();
104
105 if let Some(ref ca) = config.tls_ca_path {
106 let pem = std::fs::read(ca)
107 .map_err(|e| TransportError::Config(format!("gRPC TLS: cannot read ca {ca}: {e}")))?;
108 tls = tls.ca_certificate(Certificate::from_pem(pem));
109 } else {
110 tls = tls.with_native_roots();
112 }
113
114 if let Some(ref domain) = config.tls_domain {
115 tls = tls.domain_name(domain.clone());
116 }
117
118 match (&config.tls_client_cert_path, &config.tls_client_key_path) {
120 (Some(cert), Some(key)) => {
121 let cert_pem = std::fs::read(cert).map_err(|e| {
122 TransportError::Config(format!("gRPC TLS: cannot read client cert {cert}: {e}"))
123 })?;
124 let key_pem = std::fs::read(key).map_err(|e| {
125 TransportError::Config(format!("gRPC TLS: cannot read client key {key}: {e}"))
126 })?;
127 tls = tls.identity(Identity::from_pem(cert_pem, key_pem));
128 }
129 (None, None) => {}
130 _ => {
131 return Err(TransportError::Config(
132 "gRPC TLS: mTLS requires BOTH tls_client_cert_path and tls_client_key_path"
133 .to_string(),
134 ));
135 }
136 }
137
138 Ok(tls)
139}
140
141impl GrpcTransport {
142 pub async fn new(config: &GrpcConfig) -> TransportResult<Self> {
154 let mut client = None;
155 let mut receiver = None;
156 let mut shutdown_tx = None;
157 let mut server_handle = None;
158 let sequence = Arc::new(AtomicU64::new(0));
159
160 if let Some(endpoint) = &config.endpoint {
162 let mut ep = tonic::transport::Channel::from_shared(endpoint.clone())
163 .map_err(|e| TransportError::Config(format!("invalid endpoint: {e}")))?;
164
165 if config.tls_enabled {
168 ep = ep
169 .tls_config(build_grpc_client_tls(config)?)
170 .map_err(|e| TransportError::Config(format!("gRPC TLS config: {e}")))?;
171 }
172
173 let channel = ep.connect_lazy();
174
175 let mut c = proto::dfe_transport_client::DfeTransportClient::new(channel)
176 .max_decoding_message_size(config.max_message_size)
177 .max_encoding_message_size(config.max_message_size);
178
179 if config.compression {
180 c = c
181 .send_compressed(tonic::codec::CompressionEncoding::Gzip)
182 .accept_compressed(tonic::codec::CompressionEncoding::Gzip);
183 }
184
185 client = Some(c);
186 }
187
188 if let Some(listen) = &config.listen {
190 let addr: std::net::SocketAddr = listen
191 .parse()
192 .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
193
194 let (tx, rx) = mpsc::channel(config.recv_buffer_size);
195 let (sd_tx, sd_rx) = oneshot::channel();
196
197 let dfe_svc = DfeTransportServiceImpl {
199 sender: tx.clone(),
200 sequence: sequence.clone(),
201 };
202
203 let dfe_server = proto::dfe_transport_server::DfeTransportServer::new(dfe_svc)
204 .max_decoding_message_size(config.max_message_size)
205 .max_encoding_message_size(config.max_message_size)
206 .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
207 .send_compressed(tonic::codec::CompressionEncoding::Gzip);
208
209 let mut builder = tonic::transport::Server::builder();
211
212 #[cfg(feature = "transport-grpc-vector-compat")]
213 let router = if config.vector_compat {
214 let vector_svc =
215 super::vector_compat::source::VectorCompatService::new(tx, sequence.clone());
216 let vector_server =
217 super::vector_compat::proto::vector::vector_server::VectorServer::new(
218 vector_svc,
219 )
220 .max_decoding_message_size(config.max_message_size)
221 .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
222 .send_compressed(tonic::codec::CompressionEncoding::Gzip);
223
224 builder.add_service(dfe_server).add_service(vector_server)
225 } else {
226 builder.add_service(dfe_server)
227 };
228
229 #[cfg(not(feature = "transport-grpc-vector-compat"))]
230 let router = builder.add_service(dfe_server);
231
232 let listener = tokio::net::TcpListener::bind(addr)
240 .await
241 .map_err(|e| TransportError::Config(format!("failed to bind {addr}: {e}")))?;
242 let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
243
244 let handle = tokio::spawn(async move {
245 router
246 .serve_with_incoming_shutdown(incoming, async {
247 sd_rx.await.ok();
248 })
249 .await
250 });
251
252 receiver = Some(tokio::sync::Mutex::new(rx));
253 shutdown_tx = Some(sd_tx);
254 server_handle = Some(handle);
255 }
256
257 let healthy = Arc::new(AtomicBool::new(true));
258
259 let filter_engine = super::filter::TransportFilterEngine::new(
260 &config.filters_in,
261 &config.filters_out,
262 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
263 )?;
264
265 #[cfg(feature = "health")]
266 {
267 let h = Arc::clone(&healthy);
268 crate::health::HealthRegistry::register("transport:grpc", move || {
269 if h.load(Ordering::Relaxed) {
270 crate::health::HealthStatus::Healthy
271 } else {
272 crate::health::HealthStatus::Unhealthy
273 }
274 });
275 }
276
277 Ok(Self {
278 client,
279 receiver,
280 shutdown_tx: parking_lot::Mutex::new(shutdown_tx),
281 _server_handle: server_handle,
282 closed: AtomicBool::new(false),
283 healthy,
284 recv_timeout_ms: config.recv_timeout_ms,
285 send_timeout_ms: config.send_timeout_ms,
286 #[cfg(feature = "metrics")]
287 inflight: AtomicU64::new(0),
288 filter_engine,
289 })
290 }
291}
292
293impl TransportBase for GrpcTransport {
294 async fn close(&self) -> TransportResult<()> {
295 self.closed.store(true, Ordering::Relaxed);
296 self.healthy.store(false, Ordering::Relaxed);
297
298 if let Some(tx) = self.shutdown_tx.lock().take() {
302 let _ = tx.send(());
303 }
304 Ok(())
305 }
306
307 fn is_healthy(&self) -> bool {
308 let healthy = self.healthy.load(Ordering::Relaxed);
309 #[cfg(feature = "metrics")]
310 metrics::gauge!("dfe_transport_healthy", "transport" => "grpc").set(if healthy {
311 1.0
312 } else {
313 0.0
314 });
315 healthy
316 }
317
318 fn name(&self) -> &'static str {
319 "grpc"
320 }
321}
322
323impl TransportSender for GrpcTransport {
324 async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
325 if self.closed.load(Ordering::Relaxed) {
326 return SendResult::Fatal(TransportError::Closed);
327 }
328
329 if self.filter_engine.has_outbound_filters() {
331 match self.filter_engine.apply_outbound(&payload) {
332 super::filter::FilterDisposition::Pass => {}
333 super::filter::FilterDisposition::Drop => return SendResult::Ok,
334 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
335 }
336 }
337
338 let Some(client) = &self.client else {
339 return SendResult::Fatal(TransportError::Config(
340 "no endpoint configured for sending".into(),
341 ));
342 };
343
344 let mut metadata = HashMap::new();
345 if !key.is_empty() {
346 metadata.insert("topic".to_string(), key.to_string());
347 }
348
349 #[cfg(feature = "transport-trace")]
351 if let Some(tp) = super::propagation::current_traceparent() {
352 metadata.insert(super::propagation::TRACEPARENT_HEADER.to_string(), tp);
353 }
354
355 let mut request = tonic::Request::new(proto::PushRequest {
356 payload: payload.to_vec(),
357 format: proto::Format::Auto.into(),
358 metadata,
359 });
360
361 if self.send_timeout_ms > 0 {
365 request.set_timeout(std::time::Duration::from_millis(self.send_timeout_ms));
366 }
367
368 #[cfg(feature = "metrics")]
369 let start = std::time::Instant::now();
370
371 #[cfg(feature = "metrics")]
372 self.inflight.fetch_add(1, Ordering::Relaxed);
373
374 let result = match client.clone().push(request).await {
376 Ok(_) => {
377 #[cfg(feature = "metrics")]
378 metrics::counter!("dfe_transport_sent_total", "transport" => "grpc").increment(1);
379 SendResult::Ok
380 }
381 Err(status) => match status.code() {
382 tonic::Code::Unavailable
386 | tonic::Code::ResourceExhausted
387 | tonic::Code::DeadlineExceeded => {
388 #[cfg(feature = "metrics")]
389 metrics::counter!(
390 "dfe_transport_backpressured_total",
391 "transport" => "grpc"
392 )
393 .increment(1);
394 SendResult::Backpressured
395 }
396 _ => {
397 #[cfg(feature = "metrics")]
398 metrics::counter!(
399 "dfe_transport_send_errors_total",
400 "transport" => "grpc"
401 )
402 .increment(1);
403 SendResult::Fatal(TransportError::Send(status.message().to_string()))
404 }
405 },
406 };
407
408 #[cfg(feature = "metrics")]
409 {
410 self.inflight.fetch_sub(1, Ordering::Relaxed);
411 metrics::gauge!("dfe_transport_inflight", "transport" => "grpc")
412 .set(self.inflight.load(Ordering::Relaxed) as f64);
413 metrics::histogram!(
414 "dfe_transport_send_duration_seconds",
415 "transport" => "grpc"
416 )
417 .record(start.elapsed().as_secs_f64());
418 }
419
420 result
421 }
422}
423
424impl TransportReceiver for GrpcTransport {
425 type Token = GrpcToken;
426
427 async fn recv(&self, max: usize) -> TransportResult<RecvBatch<Self::Token>> {
428 if self.closed.load(Ordering::Relaxed) {
429 return Err(TransportError::Closed);
430 }
431
432 let Some(receiver) = &self.receiver else {
433 return Err(TransportError::Config(
434 "no listen address configured for receiving".into(),
435 ));
436 };
437
438 let mut rx = receiver.lock().await;
439 let mut messages = Vec::with_capacity(max.min(100));
440
441 for _ in 0..max {
442 let result = if self.recv_timeout_ms == 0 {
443 match rx.try_recv() {
445 Ok(msg) => Some(msg),
446 Err(mpsc::error::TryRecvError::Empty) => break,
447 Err(mpsc::error::TryRecvError::Disconnected) => {
448 return Err(TransportError::Closed);
449 }
450 }
451 } else if messages.is_empty() {
452 match tokio::time::timeout(
454 std::time::Duration::from_millis(self.recv_timeout_ms),
455 rx.recv(),
456 )
457 .await
458 {
459 Ok(Some(msg)) => Some(msg),
460 Ok(None) => return Err(TransportError::Closed),
461 Err(_) => break, }
463 } else {
464 match rx.try_recv() {
466 Ok(msg) => Some(msg),
467 Err(_) => break,
468 }
469 };
470
471 if let Some(msg) = result {
472 messages.push(msg);
473 }
474 }
475
476 let batch = self.filter_engine.partition_batch(
479 messages,
480 |m| m.payload.as_slice(),
481 |m| m.key.clone(),
482 );
483 let messages = batch.messages;
484 let dlq_entries = batch.dlq_entries;
485
486 Ok(RecvBatch {
487 messages,
488 dlq_entries,
489 })
490 }
491
492 async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
493 Ok(())
496 }
497}
498
499impl Drop for GrpcTransport {
500 fn drop(&mut self) {
501 if let Some(tx) = self.shutdown_tx.lock().take() {
503 let _ = tx.send(());
504 }
505 }
507}
508
509struct DfeTransportServiceImpl {
514 sender: mpsc::Sender<Message<GrpcToken>>,
515 sequence: Arc<AtomicU64>,
516}
517
518#[tonic::async_trait]
519impl proto::dfe_transport_server::DfeTransport for DfeTransportServiceImpl {
520 async fn push(
521 &self,
522 request: Request<proto::PushRequest>,
523 ) -> Result<Response<proto::PushResponse>, Status> {
524 let req = request.into_inner();
525 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
526
527 #[cfg(feature = "transport-trace")]
529 if let Some(tp) = req.metadata.get(super::propagation::TRACEPARENT_HEADER)
530 && super::propagation::is_valid_traceparent(tp)
531 {
532 tracing::Span::current().record("traceparent", tp.as_str());
533 }
534
535 let format = PayloadFormat::detect(&req.payload);
536 let key = req.metadata.get("topic").map(|s| Arc::from(s.as_str()));
537
538 let msg = Message {
539 key,
540 payload: req.payload,
541 token: GrpcToken::new(seq),
542 timestamp_ms: None,
543 format,
544 };
545
546 match self.sender.try_send(msg) {
547 Ok(()) => {
548 #[cfg(feature = "metrics")]
549 {
550 metrics::counter!("dfe_transport_sent_total", "transport" => "grpc")
551 .increment(1);
552 metrics::gauge!("dfe_transport_queue_size", "transport" => "grpc").set(
553 self.sender
554 .max_capacity()
555 .saturating_sub(self.sender.capacity()) as f64,
556 );
557 }
558 Ok(Response::new(proto::PushResponse { accepted: 1 }))
559 }
560 Err(mpsc::error::TrySendError::Full(_)) => {
561 #[cfg(feature = "metrics")]
562 metrics::counter!(
563 "dfe_transport_backpressured_total",
564 "transport" => "grpc"
565 )
566 .increment(1);
567 Err(Status::resource_exhausted("receiver buffer full"))
568 }
569 Err(mpsc::error::TrySendError::Closed(_)) => {
570 #[cfg(feature = "metrics")]
571 metrics::counter!(
572 "dfe_transport_refused_total",
573 "transport" => "grpc"
574 )
575 .increment(1);
576 Err(Status::unavailable("receiver closed"))
577 }
578 }
579 }
580
581 async fn health_check(
582 &self,
583 _request: Request<proto::HealthCheckRequest>,
584 ) -> Result<Response<proto::HealthCheckResponse>, Status> {
585 Ok(Response::new(proto::HealthCheckResponse {
586 status: proto::ServingStatus::Serving.into(),
587 }))
588 }
589}
590
591#[cfg(test)]
592mod tests {
593 use super::*;
594
595 #[test]
596 fn grpc_token_display() {
597 let token = GrpcToken::new(42);
598 assert_eq!(format!("{token}"), "grpc:42");
599
600 let token = GrpcToken::with_source(7, Arc::from("peer-1"));
601 assert_eq!(format!("{token}"), "grpc:peer-1:7");
602 }
603
604 #[test]
605 fn grpc_config_defaults() {
606 let config = GrpcConfig::default();
607 assert!(config.listen.is_none());
608 assert!(config.endpoint.is_none());
609 assert_eq!(config.recv_buffer_size, 10_000);
610 assert_eq!(config.recv_timeout_ms, 100);
611 assert_eq!(config.send_timeout_ms, 30_000);
612 assert_eq!(config.max_message_size, 16 * 1024 * 1024);
613 assert!(!config.compression);
614 assert!(!config.tls_enabled);
615 assert!(config.tls_ca_path.is_none());
616 }
617
618 #[test]
619 fn grpc_client_tls_builds_with_private_ca_and_rejects_half_mtls() {
620 use std::io::Write;
621 let cert = rcgen::generate_simple_self_signed(vec!["grpc.test".to_string()]).unwrap();
622 let mut ca = tempfile::NamedTempFile::new().unwrap();
623 ca.write_all(cert.cert.pem().as_bytes()).unwrap();
624 ca.flush().unwrap();
625
626 let cfg = GrpcConfig {
628 endpoint: Some("https://peer:6000".to_string()),
629 tls_enabled: true,
630 tls_ca_path: Some(ca.path().to_string_lossy().into_owned()),
631 tls_domain: Some("grpc.test".to_string()),
632 ..Default::default()
633 };
634 assert!(build_grpc_client_tls(&cfg).is_ok());
635
636 let cfg = GrpcConfig {
638 tls_enabled: true,
639 tls_client_cert_path: Some(ca.path().to_string_lossy().into_owned()),
640 tls_client_key_path: None,
641 ..Default::default()
642 };
643 assert!(build_grpc_client_tls(&cfg).is_err());
644 }
645
646 #[test]
647 fn grpc_config_server() {
648 let config = GrpcConfig::server("0.0.0.0:6000");
649 assert_eq!(config.listen.as_deref(), Some("0.0.0.0:6000"));
650 assert!(config.endpoint.is_none());
651 }
652
653 #[test]
654 fn grpc_config_client() {
655 let config = GrpcConfig::client("http://loader:6000");
656 assert!(config.listen.is_none());
657 assert_eq!(config.endpoint.as_deref(), Some("http://loader:6000"));
658 }
659
660 #[test]
661 fn grpc_config_with_compression() {
662 let config = GrpcConfig::server("0.0.0.0:6000").with_compression();
663 assert!(config.compression);
664 }
665
666 #[tokio::test]
667 async fn grpc_transport_client_only() {
668 let config = GrpcConfig::client("http://localhost:16000");
670 let transport = GrpcTransport::new(&config).await.unwrap();
671
672 assert!(transport.client.is_some());
673 assert!(transport.receiver.is_none());
674 assert!(transport.is_healthy());
675 assert_eq!(transport.name(), "grpc");
676
677 let result = transport.recv(10).await;
679 assert!(result.is_err());
680
681 transport.commit(&[]).await.unwrap();
683 }
684
685 #[tokio::test]
686 async fn grpc_transport_server_only() {
687 let config = GrpcConfig::server("127.0.0.1:16001");
690 let transport = GrpcTransport::new(&config).await.unwrap();
691
692 assert!(transport.client.is_none());
693 assert!(transport.receiver.is_some());
694 assert!(transport.is_healthy());
695
696 let result = transport
698 .send("test", bytes::Bytes::from_static(b"payload"))
699 .await;
700 assert!(result.is_fatal());
701
702 transport.close().await.unwrap();
704 assert!(!transport.is_healthy());
705 }
706}