hyperi_rustlib/transport/grpc/
mod.rs1pub mod config;
40pub mod proto;
41pub mod token;
42
43pub use config::GrpcConfig;
44pub use token::GrpcToken;
45
46use super::error::{TransportError, TransportResult};
47use super::traits::{TransportBase, TransportReceiver, TransportSender};
48use super::types::{Message, PayloadFormat, SendResult};
49use std::collections::HashMap;
50use std::sync::Arc;
51use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
52use tokio::sync::{mpsc, oneshot};
53use tonic::{Request, Response, Status};
54
55pub struct GrpcTransport {
60 client: Option<proto::dfe_transport_client::DfeTransportClient<tonic::transport::Channel>>,
62
63 receiver: Option<tokio::sync::Mutex<mpsc::Receiver<Message<GrpcToken>>>>,
65
66 shutdown_tx: Option<oneshot::Sender<()>>,
68
69 _server_handle: Option<tokio::task::JoinHandle<Result<(), tonic::transport::Error>>>,
71
72 closed: AtomicBool,
74
75 healthy: Arc<AtomicBool>,
77
78 recv_timeout_ms: u64,
80
81 #[cfg(feature = "metrics")]
83 inflight: AtomicU64,
84
85 filter_engine: super::filter::TransportFilterEngine,
87
88 filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
91}
92
93impl GrpcTransport {
94 pub async fn new(config: &GrpcConfig) -> TransportResult<Self> {
106 let mut client = None;
107 let mut receiver = None;
108 let mut shutdown_tx = None;
109 let mut server_handle = None;
110 let sequence = Arc::new(AtomicU64::new(0));
111
112 if let Some(endpoint) = &config.endpoint {
114 let channel = tonic::transport::Channel::from_shared(endpoint.clone())
115 .map_err(|e| TransportError::Config(format!("invalid endpoint: {e}")))?
116 .connect_lazy();
117
118 let mut c = proto::dfe_transport_client::DfeTransportClient::new(channel)
119 .max_decoding_message_size(config.max_message_size)
120 .max_encoding_message_size(config.max_message_size);
121
122 if config.compression {
123 c = c
124 .send_compressed(tonic::codec::CompressionEncoding::Gzip)
125 .accept_compressed(tonic::codec::CompressionEncoding::Gzip);
126 }
127
128 client = Some(c);
129 }
130
131 if let Some(listen) = &config.listen {
133 let addr: std::net::SocketAddr = listen
134 .parse()
135 .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
136
137 let (tx, rx) = mpsc::channel(config.recv_buffer_size);
138 let (sd_tx, sd_rx) = oneshot::channel();
139
140 let dfe_svc = DfeTransportServiceImpl {
142 sender: tx.clone(),
143 sequence: sequence.clone(),
144 };
145
146 let dfe_server = proto::dfe_transport_server::DfeTransportServer::new(dfe_svc)
147 .max_decoding_message_size(config.max_message_size)
148 .max_encoding_message_size(config.max_message_size)
149 .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
150 .send_compressed(tonic::codec::CompressionEncoding::Gzip);
151
152 let mut builder = tonic::transport::Server::builder();
154
155 #[cfg(feature = "transport-grpc-vector-compat")]
156 let router = if config.vector_compat {
157 let vector_svc =
158 super::vector_compat::source::VectorCompatService::new(tx, sequence.clone());
159 let vector_server =
160 super::vector_compat::proto::vector::vector_server::VectorServer::new(
161 vector_svc,
162 )
163 .max_decoding_message_size(config.max_message_size)
164 .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
165 .send_compressed(tonic::codec::CompressionEncoding::Gzip);
166
167 builder.add_service(dfe_server).add_service(vector_server)
168 } else {
169 builder.add_service(dfe_server)
170 };
171
172 #[cfg(not(feature = "transport-grpc-vector-compat"))]
173 let router = builder.add_service(dfe_server);
174
175 let listener = tokio::net::TcpListener::bind(addr)
183 .await
184 .map_err(|e| TransportError::Config(format!("failed to bind {addr}: {e}")))?;
185 let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
186
187 let handle = tokio::spawn(async move {
188 router
189 .serve_with_incoming_shutdown(incoming, async {
190 sd_rx.await.ok();
191 })
192 .await
193 });
194
195 receiver = Some(tokio::sync::Mutex::new(rx));
196 shutdown_tx = Some(sd_tx);
197 server_handle = Some(handle);
198 }
199
200 let healthy = Arc::new(AtomicBool::new(true));
201
202 let filter_engine = super::filter::TransportFilterEngine::new(
203 &config.filters_in,
204 &config.filters_out,
205 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
206 )?;
207
208 #[cfg(feature = "health")]
209 {
210 let h = Arc::clone(&healthy);
211 crate::health::HealthRegistry::register("transport:grpc", move || {
212 if h.load(Ordering::Relaxed) {
213 crate::health::HealthStatus::Healthy
214 } else {
215 crate::health::HealthStatus::Unhealthy
216 }
217 });
218 }
219
220 Ok(Self {
221 client,
222 receiver,
223 shutdown_tx,
224 _server_handle: server_handle,
225 closed: AtomicBool::new(false),
226 healthy,
227 recv_timeout_ms: config.recv_timeout_ms,
228 #[cfg(feature = "metrics")]
229 inflight: AtomicU64::new(0),
230 filter_engine,
231 filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
232 })
233 }
234}
235
236impl TransportBase for GrpcTransport {
237 async fn close(&self) -> TransportResult<()> {
238 self.closed.store(true, Ordering::Relaxed);
239 self.healthy.store(false, Ordering::Relaxed);
240
241 Ok(())
245 }
246
247 fn is_healthy(&self) -> bool {
248 let healthy = self.healthy.load(Ordering::Relaxed);
249 #[cfg(feature = "metrics")]
250 metrics::gauge!("dfe_transport_healthy", "transport" => "grpc").set(if healthy {
251 1.0
252 } else {
253 0.0
254 });
255 healthy
256 }
257
258 fn name(&self) -> &'static str {
259 "grpc"
260 }
261}
262
263impl TransportSender for GrpcTransport {
264 async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
265 if self.closed.load(Ordering::Relaxed) {
266 return SendResult::Fatal(TransportError::Closed);
267 }
268
269 if self.filter_engine.has_outbound_filters() {
271 match self.filter_engine.apply_outbound(payload) {
272 super::filter::FilterDisposition::Pass => {}
273 super::filter::FilterDisposition::Drop => return SendResult::Ok,
274 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
275 }
276 }
277
278 let Some(client) = &self.client else {
279 return SendResult::Fatal(TransportError::Config(
280 "no endpoint configured for sending".into(),
281 ));
282 };
283
284 let mut metadata = HashMap::new();
285 if !key.is_empty() {
286 metadata.insert("topic".to_string(), key.to_string());
287 }
288
289 #[cfg(feature = "transport-trace")]
291 if let Some(tp) = super::propagation::current_traceparent() {
292 metadata.insert(super::propagation::TRACEPARENT_HEADER.to_string(), tp);
293 }
294
295 let request = proto::PushRequest {
296 payload: payload.to_vec(),
297 format: proto::Format::Auto.into(),
298 metadata,
299 };
300
301 #[cfg(feature = "metrics")]
302 let start = std::time::Instant::now();
303
304 #[cfg(feature = "metrics")]
305 self.inflight.fetch_add(1, Ordering::Relaxed);
306
307 let result = match client.clone().push(request).await {
309 Ok(_) => {
310 #[cfg(feature = "metrics")]
311 metrics::counter!("dfe_transport_sent_total", "transport" => "grpc").increment(1);
312 SendResult::Ok
313 }
314 Err(status) => match status.code() {
315 tonic::Code::Unavailable | tonic::Code::ResourceExhausted => {
316 #[cfg(feature = "metrics")]
317 metrics::counter!(
318 "dfe_transport_backpressured_total",
319 "transport" => "grpc"
320 )
321 .increment(1);
322 SendResult::Backpressured
323 }
324 _ => {
325 #[cfg(feature = "metrics")]
326 metrics::counter!(
327 "dfe_transport_send_errors_total",
328 "transport" => "grpc"
329 )
330 .increment(1);
331 SendResult::Fatal(TransportError::Send(status.message().to_string()))
332 }
333 },
334 };
335
336 #[cfg(feature = "metrics")]
337 {
338 self.inflight.fetch_sub(1, Ordering::Relaxed);
339 metrics::gauge!("dfe_transport_inflight", "transport" => "grpc")
340 .set(self.inflight.load(Ordering::Relaxed) as f64);
341 metrics::histogram!(
342 "dfe_transport_send_duration_seconds",
343 "transport" => "grpc"
344 )
345 .record(start.elapsed().as_secs_f64());
346 }
347
348 result
349 }
350}
351
352impl TransportReceiver for GrpcTransport {
353 type Token = GrpcToken;
354
355 async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
356 if self.closed.load(Ordering::Relaxed) {
357 return Err(TransportError::Closed);
358 }
359
360 let Some(receiver) = &self.receiver else {
361 return Err(TransportError::Config(
362 "no listen address configured for receiving".into(),
363 ));
364 };
365
366 let mut rx = receiver.lock().await;
367 let mut messages = Vec::with_capacity(max.min(100));
368
369 for _ in 0..max {
370 let result = if self.recv_timeout_ms == 0 {
371 match rx.try_recv() {
373 Ok(msg) => Some(msg),
374 Err(mpsc::error::TryRecvError::Empty) => break,
375 Err(mpsc::error::TryRecvError::Disconnected) => {
376 return Err(TransportError::Closed);
377 }
378 }
379 } else if messages.is_empty() {
380 match tokio::time::timeout(
382 std::time::Duration::from_millis(self.recv_timeout_ms),
383 rx.recv(),
384 )
385 .await
386 {
387 Ok(Some(msg)) => Some(msg),
388 Ok(None) => return Err(TransportError::Closed),
389 Err(_) => break, }
391 } else {
392 match rx.try_recv() {
394 Ok(msg) => Some(msg),
395 Err(_) => break,
396 }
397 };
398
399 if let Some(msg) = result {
400 messages.push(msg);
401 }
402 }
403
404 if self.filter_engine.has_inbound_filters() {
406 let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
407 messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
408 super::filter::FilterDisposition::Pass => true,
409 super::filter::FilterDisposition::Drop => false,
410 super::filter::FilterDisposition::Dlq => {
411 staged_dlq.push(super::filter::FilteredDlqEntry {
412 payload: msg.payload.clone(),
413 key: msg.key.clone(),
414 reason: "transport filter".to_string(),
415 });
416 false
417 }
418 });
419 if !staged_dlq.is_empty() {
420 self.filtered_dlq_buffer.lock().extend(staged_dlq);
421 }
422 }
423
424 Ok(messages)
425 }
426
427 fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
428 std::mem::take(&mut *self.filtered_dlq_buffer.lock())
429 }
430
431 async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
432 Ok(())
435 }
436}
437
438impl Drop for GrpcTransport {
439 fn drop(&mut self) {
440 if let Some(tx) = self.shutdown_tx.take() {
442 let _ = tx.send(());
443 }
444 }
446}
447
448struct DfeTransportServiceImpl {
453 sender: mpsc::Sender<Message<GrpcToken>>,
454 sequence: Arc<AtomicU64>,
455}
456
457#[tonic::async_trait]
458impl proto::dfe_transport_server::DfeTransport for DfeTransportServiceImpl {
459 async fn push(
460 &self,
461 request: Request<proto::PushRequest>,
462 ) -> Result<Response<proto::PushResponse>, Status> {
463 let req = request.into_inner();
464 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
465
466 #[cfg(feature = "transport-trace")]
468 if let Some(tp) = req.metadata.get(super::propagation::TRACEPARENT_HEADER)
469 && super::propagation::is_valid_traceparent(tp)
470 {
471 tracing::Span::current().record("traceparent", tp.as_str());
472 }
473
474 let format = PayloadFormat::detect(&req.payload);
475 let key = req.metadata.get("topic").map(|s| Arc::from(s.as_str()));
476
477 let msg = Message {
478 key,
479 payload: req.payload,
480 token: GrpcToken::new(seq),
481 timestamp_ms: None,
482 format,
483 };
484
485 match self.sender.try_send(msg) {
486 Ok(()) => {
487 #[cfg(feature = "metrics")]
488 {
489 metrics::counter!("dfe_transport_sent_total", "transport" => "grpc")
490 .increment(1);
491 metrics::gauge!("dfe_transport_queue_size", "transport" => "grpc").set(
492 self.sender
493 .max_capacity()
494 .saturating_sub(self.sender.capacity()) as f64,
495 );
496 }
497 Ok(Response::new(proto::PushResponse { accepted: 1 }))
498 }
499 Err(mpsc::error::TrySendError::Full(_)) => {
500 #[cfg(feature = "metrics")]
501 metrics::counter!(
502 "dfe_transport_backpressured_total",
503 "transport" => "grpc"
504 )
505 .increment(1);
506 Err(Status::resource_exhausted("receiver buffer full"))
507 }
508 Err(mpsc::error::TrySendError::Closed(_)) => {
509 #[cfg(feature = "metrics")]
510 metrics::counter!(
511 "dfe_transport_refused_total",
512 "transport" => "grpc"
513 )
514 .increment(1);
515 Err(Status::unavailable("receiver closed"))
516 }
517 }
518 }
519
520 async fn health_check(
521 &self,
522 _request: Request<proto::HealthCheckRequest>,
523 ) -> Result<Response<proto::HealthCheckResponse>, Status> {
524 Ok(Response::new(proto::HealthCheckResponse {
525 status: proto::ServingStatus::Serving.into(),
526 }))
527 }
528}
529
530#[cfg(test)]
531mod tests {
532 use super::*;
533
534 #[test]
535 fn grpc_token_display() {
536 let token = GrpcToken::new(42);
537 assert_eq!(format!("{token}"), "grpc:42");
538
539 let token = GrpcToken::with_source(7, Arc::from("peer-1"));
540 assert_eq!(format!("{token}"), "grpc:peer-1:7");
541 }
542
543 #[test]
544 fn grpc_config_defaults() {
545 let config = GrpcConfig::default();
546 assert!(config.listen.is_none());
547 assert!(config.endpoint.is_none());
548 assert_eq!(config.recv_buffer_size, 10_000);
549 assert_eq!(config.recv_timeout_ms, 100);
550 assert_eq!(config.max_message_size, 16 * 1024 * 1024);
551 assert!(!config.compression);
552 }
553
554 #[test]
555 fn grpc_config_server() {
556 let config = GrpcConfig::server("0.0.0.0:6000");
557 assert_eq!(config.listen.as_deref(), Some("0.0.0.0:6000"));
558 assert!(config.endpoint.is_none());
559 }
560
561 #[test]
562 fn grpc_config_client() {
563 let config = GrpcConfig::client("http://loader:6000");
564 assert!(config.listen.is_none());
565 assert_eq!(config.endpoint.as_deref(), Some("http://loader:6000"));
566 }
567
568 #[test]
569 fn grpc_config_with_compression() {
570 let config = GrpcConfig::server("0.0.0.0:6000").with_compression();
571 assert!(config.compression);
572 }
573
574 #[tokio::test]
575 async fn grpc_transport_client_only() {
576 let config = GrpcConfig::client("http://localhost:16000");
578 let transport = GrpcTransport::new(&config).await.unwrap();
579
580 assert!(transport.client.is_some());
581 assert!(transport.receiver.is_none());
582 assert!(transport.is_healthy());
583 assert_eq!(transport.name(), "grpc");
584
585 let result = transport.recv(10).await;
587 assert!(result.is_err());
588
589 transport.commit(&[]).await.unwrap();
591 }
592
593 #[tokio::test]
594 async fn grpc_transport_server_only() {
595 let config = GrpcConfig::server("127.0.0.1:16001");
598 let transport = GrpcTransport::new(&config).await.unwrap();
599
600 assert!(transport.client.is_none());
601 assert!(transport.receiver.is_some());
602 assert!(transport.is_healthy());
603
604 let result = transport.send("test", b"payload").await;
606 assert!(result.is_fatal());
607
608 transport.close().await.unwrap();
610 assert!(!transport.is_healthy());
611 }
612}