hyperi_rustlib/transport/grpc/
mod.rs1pub mod config;
40pub mod proto;
41pub mod token;
42
43pub use config::GrpcConfig;
44pub use token::GrpcToken;
45
46use super::error::{TransportError, TransportResult};
47use super::traits::{TransportBase, TransportReceiver, TransportSender};
48use super::types::{Message, PayloadFormat, SendResult};
49use std::collections::HashMap;
50use std::sync::Arc;
51use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
52use tokio::sync::{mpsc, oneshot};
53use tonic::{Request, Response, Status};
54
55pub struct GrpcTransport {
60 client: Option<proto::dfe_transport_client::DfeTransportClient<tonic::transport::Channel>>,
62
63 receiver: Option<tokio::sync::Mutex<mpsc::Receiver<Message<GrpcToken>>>>,
65
66 shutdown_tx: Option<oneshot::Sender<()>>,
68
69 _server_handle: Option<tokio::task::JoinHandle<Result<(), tonic::transport::Error>>>,
71
72 closed: AtomicBool,
74
75 healthy: Arc<AtomicBool>,
77
78 recv_timeout_ms: u64,
80
81 #[cfg(feature = "metrics")]
83 inflight: AtomicU64,
84
85 filter_engine: super::filter::TransportFilterEngine,
87
88 filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
91}
92
93impl GrpcTransport {
94 pub async fn new(config: &GrpcConfig) -> TransportResult<Self> {
106 let mut client = None;
107 let mut receiver = None;
108 let mut shutdown_tx = None;
109 let mut server_handle = None;
110 let sequence = Arc::new(AtomicU64::new(0));
111
112 if let Some(endpoint) = &config.endpoint {
114 let channel = tonic::transport::Channel::from_shared(endpoint.clone())
115 .map_err(|e| TransportError::Config(format!("invalid endpoint: {e}")))?
116 .connect_lazy();
117
118 let mut c = proto::dfe_transport_client::DfeTransportClient::new(channel)
119 .max_decoding_message_size(config.max_message_size)
120 .max_encoding_message_size(config.max_message_size);
121
122 if config.compression {
123 c = c
124 .send_compressed(tonic::codec::CompressionEncoding::Gzip)
125 .accept_compressed(tonic::codec::CompressionEncoding::Gzip);
126 }
127
128 client = Some(c);
129 }
130
131 if let Some(listen) = &config.listen {
133 let addr: std::net::SocketAddr = listen
134 .parse()
135 .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
136
137 let (tx, rx) = mpsc::channel(config.recv_buffer_size);
138 let (sd_tx, sd_rx) = oneshot::channel();
139
140 let dfe_svc = DfeTransportServiceImpl {
142 sender: tx.clone(),
143 sequence: sequence.clone(),
144 };
145
146 let dfe_server = proto::dfe_transport_server::DfeTransportServer::new(dfe_svc)
147 .max_decoding_message_size(config.max_message_size)
148 .max_encoding_message_size(config.max_message_size)
149 .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
150 .send_compressed(tonic::codec::CompressionEncoding::Gzip);
151
152 let mut builder = tonic::transport::Server::builder();
154
155 #[cfg(feature = "transport-grpc-vector-compat")]
156 let router = if config.vector_compat {
157 let vector_svc =
158 super::vector_compat::source::VectorCompatService::new(tx, sequence.clone());
159 let vector_server =
160 super::vector_compat::proto::vector::vector_server::VectorServer::new(
161 vector_svc,
162 )
163 .max_decoding_message_size(config.max_message_size)
164 .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
165 .send_compressed(tonic::codec::CompressionEncoding::Gzip);
166
167 builder.add_service(dfe_server).add_service(vector_server)
168 } else {
169 builder.add_service(dfe_server)
170 };
171
172 #[cfg(not(feature = "transport-grpc-vector-compat"))]
173 let router = builder.add_service(dfe_server);
174
175 let handle = tokio::spawn(async move {
176 router
177 .serve_with_shutdown(addr, async {
178 sd_rx.await.ok();
179 })
180 .await
181 });
182
183 receiver = Some(tokio::sync::Mutex::new(rx));
184 shutdown_tx = Some(sd_tx);
185 server_handle = Some(handle);
186 }
187
188 let healthy = Arc::new(AtomicBool::new(true));
189
190 let filter_engine = super::filter::TransportFilterEngine::new(
191 &config.filters_in,
192 &config.filters_out,
193 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
194 )?;
195
196 #[cfg(feature = "health")]
197 {
198 let h = Arc::clone(&healthy);
199 crate::health::HealthRegistry::register("transport:grpc", move || {
200 if h.load(Ordering::Relaxed) {
201 crate::health::HealthStatus::Healthy
202 } else {
203 crate::health::HealthStatus::Unhealthy
204 }
205 });
206 }
207
208 Ok(Self {
209 client,
210 receiver,
211 shutdown_tx,
212 _server_handle: server_handle,
213 closed: AtomicBool::new(false),
214 healthy,
215 recv_timeout_ms: config.recv_timeout_ms,
216 #[cfg(feature = "metrics")]
217 inflight: AtomicU64::new(0),
218 filter_engine,
219 filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
220 })
221 }
222}
223
224impl TransportBase for GrpcTransport {
225 async fn close(&self) -> TransportResult<()> {
226 self.closed.store(true, Ordering::Relaxed);
227 self.healthy.store(false, Ordering::Relaxed);
228
229 Ok(())
233 }
234
235 fn is_healthy(&self) -> bool {
236 let healthy = self.healthy.load(Ordering::Relaxed);
237 #[cfg(feature = "metrics")]
238 metrics::gauge!("dfe_transport_healthy", "transport" => "grpc").set(if healthy {
239 1.0
240 } else {
241 0.0
242 });
243 healthy
244 }
245
246 fn name(&self) -> &'static str {
247 "grpc"
248 }
249}
250
251impl TransportSender for GrpcTransport {
252 async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
253 if self.closed.load(Ordering::Relaxed) {
254 return SendResult::Fatal(TransportError::Closed);
255 }
256
257 if self.filter_engine.has_outbound_filters() {
259 match self.filter_engine.apply_outbound(payload) {
260 super::filter::FilterDisposition::Pass => {}
261 super::filter::FilterDisposition::Drop => return SendResult::Ok,
262 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
263 }
264 }
265
266 let Some(client) = &self.client else {
267 return SendResult::Fatal(TransportError::Config(
268 "no endpoint configured for sending".into(),
269 ));
270 };
271
272 let mut metadata = HashMap::new();
273 if !key.is_empty() {
274 metadata.insert("topic".to_string(), key.to_string());
275 }
276
277 #[cfg(feature = "transport-trace")]
279 if let Some(tp) = super::propagation::current_traceparent() {
280 metadata.insert(super::propagation::TRACEPARENT_HEADER.to_string(), tp);
281 }
282
283 let request = proto::PushRequest {
284 payload: payload.to_vec(),
285 format: proto::Format::Auto.into(),
286 metadata,
287 };
288
289 #[cfg(feature = "metrics")]
290 let start = std::time::Instant::now();
291
292 #[cfg(feature = "metrics")]
293 self.inflight.fetch_add(1, Ordering::Relaxed);
294
295 let result = match client.clone().push(request).await {
297 Ok(_) => {
298 #[cfg(feature = "metrics")]
299 metrics::counter!("dfe_transport_sent_total", "transport" => "grpc").increment(1);
300 SendResult::Ok
301 }
302 Err(status) => match status.code() {
303 tonic::Code::Unavailable | tonic::Code::ResourceExhausted => {
304 #[cfg(feature = "metrics")]
305 metrics::counter!(
306 "dfe_transport_backpressured_total",
307 "transport" => "grpc"
308 )
309 .increment(1);
310 SendResult::Backpressured
311 }
312 _ => {
313 #[cfg(feature = "metrics")]
314 metrics::counter!(
315 "dfe_transport_send_errors_total",
316 "transport" => "grpc"
317 )
318 .increment(1);
319 SendResult::Fatal(TransportError::Send(status.message().to_string()))
320 }
321 },
322 };
323
324 #[cfg(feature = "metrics")]
325 {
326 self.inflight.fetch_sub(1, Ordering::Relaxed);
327 metrics::gauge!("dfe_transport_inflight", "transport" => "grpc")
328 .set(self.inflight.load(Ordering::Relaxed) as f64);
329 metrics::histogram!(
330 "dfe_transport_send_duration_seconds",
331 "transport" => "grpc"
332 )
333 .record(start.elapsed().as_secs_f64());
334 }
335
336 result
337 }
338}
339
340impl TransportReceiver for GrpcTransport {
341 type Token = GrpcToken;
342
343 async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
344 if self.closed.load(Ordering::Relaxed) {
345 return Err(TransportError::Closed);
346 }
347
348 let Some(receiver) = &self.receiver else {
349 return Err(TransportError::Config(
350 "no listen address configured for receiving".into(),
351 ));
352 };
353
354 let mut rx = receiver.lock().await;
355 let mut messages = Vec::with_capacity(max.min(100));
356
357 for _ in 0..max {
358 let result = if self.recv_timeout_ms == 0 {
359 match rx.try_recv() {
361 Ok(msg) => Some(msg),
362 Err(mpsc::error::TryRecvError::Empty) => break,
363 Err(mpsc::error::TryRecvError::Disconnected) => {
364 return Err(TransportError::Closed);
365 }
366 }
367 } else if messages.is_empty() {
368 match tokio::time::timeout(
370 std::time::Duration::from_millis(self.recv_timeout_ms),
371 rx.recv(),
372 )
373 .await
374 {
375 Ok(Some(msg)) => Some(msg),
376 Ok(None) => return Err(TransportError::Closed),
377 Err(_) => break, }
379 } else {
380 match rx.try_recv() {
382 Ok(msg) => Some(msg),
383 Err(_) => break,
384 }
385 };
386
387 if let Some(msg) = result {
388 messages.push(msg);
389 }
390 }
391
392 if self.filter_engine.has_inbound_filters() {
394 let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
395 messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
396 super::filter::FilterDisposition::Pass => true,
397 super::filter::FilterDisposition::Drop => false,
398 super::filter::FilterDisposition::Dlq => {
399 staged_dlq.push(super::filter::FilteredDlqEntry {
400 payload: msg.payload.clone(),
401 key: msg.key.clone(),
402 reason: "transport filter".to_string(),
403 });
404 false
405 }
406 });
407 if !staged_dlq.is_empty() {
408 self.filtered_dlq_buffer.lock().extend(staged_dlq);
409 }
410 }
411
412 Ok(messages)
413 }
414
415 fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
416 std::mem::take(&mut *self.filtered_dlq_buffer.lock())
417 }
418
419 async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
420 Ok(())
423 }
424}
425
426impl Drop for GrpcTransport {
427 fn drop(&mut self) {
428 if let Some(tx) = self.shutdown_tx.take() {
430 let _ = tx.send(());
431 }
432 }
434}
435
436struct DfeTransportServiceImpl {
441 sender: mpsc::Sender<Message<GrpcToken>>,
442 sequence: Arc<AtomicU64>,
443}
444
445#[tonic::async_trait]
446impl proto::dfe_transport_server::DfeTransport for DfeTransportServiceImpl {
447 async fn push(
448 &self,
449 request: Request<proto::PushRequest>,
450 ) -> Result<Response<proto::PushResponse>, Status> {
451 let req = request.into_inner();
452 let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
453
454 #[cfg(feature = "transport-trace")]
456 if let Some(tp) = req.metadata.get(super::propagation::TRACEPARENT_HEADER)
457 && super::propagation::is_valid_traceparent(tp)
458 {
459 tracing::Span::current().record("traceparent", tp.as_str());
460 }
461
462 let format = PayloadFormat::detect(&req.payload);
463 let key = req.metadata.get("topic").map(|s| Arc::from(s.as_str()));
464
465 let msg = Message {
466 key,
467 payload: req.payload,
468 token: GrpcToken::new(seq),
469 timestamp_ms: None,
470 format,
471 };
472
473 match self.sender.try_send(msg) {
474 Ok(()) => {
475 #[cfg(feature = "metrics")]
476 {
477 metrics::counter!("dfe_transport_sent_total", "transport" => "grpc")
478 .increment(1);
479 metrics::gauge!("dfe_transport_queue_size", "transport" => "grpc").set(
480 self.sender
481 .max_capacity()
482 .saturating_sub(self.sender.capacity()) as f64,
483 );
484 }
485 Ok(Response::new(proto::PushResponse { accepted: 1 }))
486 }
487 Err(mpsc::error::TrySendError::Full(_)) => {
488 #[cfg(feature = "metrics")]
489 metrics::counter!(
490 "dfe_transport_backpressured_total",
491 "transport" => "grpc"
492 )
493 .increment(1);
494 Err(Status::resource_exhausted("receiver buffer full"))
495 }
496 Err(mpsc::error::TrySendError::Closed(_)) => {
497 #[cfg(feature = "metrics")]
498 metrics::counter!(
499 "dfe_transport_refused_total",
500 "transport" => "grpc"
501 )
502 .increment(1);
503 Err(Status::unavailable("receiver closed"))
504 }
505 }
506 }
507
508 async fn health_check(
509 &self,
510 _request: Request<proto::HealthCheckRequest>,
511 ) -> Result<Response<proto::HealthCheckResponse>, Status> {
512 Ok(Response::new(proto::HealthCheckResponse {
513 status: proto::ServingStatus::Serving.into(),
514 }))
515 }
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521
522 #[test]
523 fn grpc_token_display() {
524 let token = GrpcToken::new(42);
525 assert_eq!(format!("{token}"), "grpc:42");
526
527 let token = GrpcToken::with_source(7, Arc::from("peer-1"));
528 assert_eq!(format!("{token}"), "grpc:peer-1:7");
529 }
530
531 #[test]
532 fn grpc_config_defaults() {
533 let config = GrpcConfig::default();
534 assert!(config.listen.is_none());
535 assert!(config.endpoint.is_none());
536 assert_eq!(config.recv_buffer_size, 10_000);
537 assert_eq!(config.recv_timeout_ms, 100);
538 assert_eq!(config.max_message_size, 16 * 1024 * 1024);
539 assert!(!config.compression);
540 }
541
542 #[test]
543 fn grpc_config_server() {
544 let config = GrpcConfig::server("0.0.0.0:6000");
545 assert_eq!(config.listen.as_deref(), Some("0.0.0.0:6000"));
546 assert!(config.endpoint.is_none());
547 }
548
549 #[test]
550 fn grpc_config_client() {
551 let config = GrpcConfig::client("http://loader:6000");
552 assert!(config.listen.is_none());
553 assert_eq!(config.endpoint.as_deref(), Some("http://loader:6000"));
554 }
555
556 #[test]
557 fn grpc_config_with_compression() {
558 let config = GrpcConfig::server("0.0.0.0:6000").with_compression();
559 assert!(config.compression);
560 }
561
562 #[tokio::test]
563 async fn grpc_transport_client_only() {
564 let config = GrpcConfig::client("http://localhost:16000");
566 let transport = GrpcTransport::new(&config).await.unwrap();
567
568 assert!(transport.client.is_some());
569 assert!(transport.receiver.is_none());
570 assert!(transport.is_healthy());
571 assert_eq!(transport.name(), "grpc");
572
573 let result = transport.recv(10).await;
575 assert!(result.is_err());
576
577 transport.commit(&[]).await.unwrap();
579 }
580
581 #[tokio::test]
582 async fn grpc_transport_server_only() {
583 let config = GrpcConfig::server("127.0.0.1:16001");
586 let transport = GrpcTransport::new(&config).await.unwrap();
587
588 assert!(transport.client.is_none());
589 assert!(transport.receiver.is_some());
590 assert!(transport.is_healthy());
591
592 let result = transport.send("test", b"payload").await;
594 assert!(result.is_fatal());
595
596 transport.close().await.unwrap();
598 assert!(!transport.is_healthy());
599 }
600}