Skip to main content

hyperi_rustlib/transport/grpc/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/grpc/mod.rs
3// Purpose:   gRPC transport backend
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # gRPC Transport
10//!
11//! DFE native gRPC transport using tonic. Supports client mode (sending),
12//! server mode (receiving), or both.
13//!
14//! ## DFE Native Protocol
15//!
16//! Lightweight bulk bytes transfer via `dfe.transport.v1.DfeTransport/Push`.
17//! Payload is opaque bytes (JSON, MsgPack, or Arrow IPC) with a format hint.
18//!
19//! ## Vector Wire Protocol Compatibility (optional)
20//!
21//! When the `transport-grpc-vector-compat` feature is enabled and
22//! `GrpcConfig::vector_compat` is true, the server also accepts
23//! `vector.Vector/PushEvents` RPCs from legacy Vector sinks.
24//!
25//! ## Example
26//!
27//! ```rust,ignore
28//! use hyperi_rustlib::transport::{GrpcTransport, GrpcConfig, TransportReceiver};
29//!
30//! // Server mode (receive from remote senders)
31//! let config = GrpcConfig::server("0.0.0.0:6000");
32//! let transport = GrpcTransport::new(&config).await?;
33//!
34//! let messages = transport.recv(100).await?;
35//! // commit is a no-op for gRPC (no persistence)
36//! transport.commit(&[]).await?;
37//! ```
38
39pub mod config;
40pub mod proto;
41pub mod token;
42
43pub use config::GrpcConfig;
44pub use token::GrpcToken;
45
46use super::error::{TransportError, TransportResult};
47use super::traits::{TransportBase, TransportReceiver, TransportSender};
48use super::types::{Message, PayloadFormat, SendResult};
49use std::collections::HashMap;
50use std::sync::Arc;
51use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
52use tokio::sync::{mpsc, oneshot};
53use tonic::{Request, Response, Status};
54
55/// gRPC transport for DFE inter-service communication.
56///
57/// Implements both `TransportSender` and `TransportReceiver`, so it also
58/// satisfies the unified `Transport` trait via blanket impl.
59pub struct GrpcTransport {
60    /// Client for sending (None if server-only mode).
61    client: Option<proto::dfe_transport_client::DfeTransportClient<tonic::transport::Channel>>,
62
63    /// Receiver channel (None if client-only mode).
64    receiver: Option<tokio::sync::Mutex<mpsc::Receiver<Message<GrpcToken>>>>,
65
66    /// Shutdown signal for the server task.
67    shutdown_tx: Option<oneshot::Sender<()>>,
68
69    /// Server background task handle (kept alive, aborted on drop).
70    _server_handle: Option<tokio::task::JoinHandle<Result<(), tonic::transport::Error>>>,
71
72    /// Whether the transport is closed.
73    closed: AtomicBool,
74
75    /// Shared healthy flag -- read by health registry closure, written by close().
76    healthy: Arc<AtomicBool>,
77
78    /// Receive timeout (milliseconds).
79    recv_timeout_ms: u64,
80
81    /// In-flight send count (for metrics).
82    #[cfg(feature = "metrics")]
83    inflight: AtomicU64,
84
85    /// Transport-level message filter engine.
86    filter_engine: super::filter::TransportFilterEngine,
87
88    /// Buffer for messages staged to DLQ by inbound filters.
89    /// Drained by `take_filtered_dlq_entries()`.
90    filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
91}
92
93impl GrpcTransport {
94    /// Create a new gRPC transport.
95    ///
96    /// # Configuration
97    ///
98    /// - Set `config.listen` to start a gRPC server (receive mode).
99    /// - Set `config.endpoint` to connect to a remote server (send mode).
100    /// - Set both for bidirectional communication.
101    ///
102    /// # Errors
103    ///
104    /// Returns error if the listen address is invalid or the server fails to start.
105    pub async fn new(config: &GrpcConfig) -> TransportResult<Self> {
106        let mut client = None;
107        let mut receiver = None;
108        let mut shutdown_tx = None;
109        let mut server_handle = None;
110        let sequence = Arc::new(AtomicU64::new(0));
111
112        // Set up client (lazy connection -- doesn't fail until first RPC)
113        if let Some(endpoint) = &config.endpoint {
114            let channel = tonic::transport::Channel::from_shared(endpoint.clone())
115                .map_err(|e| TransportError::Config(format!("invalid endpoint: {e}")))?
116                .connect_lazy();
117
118            let mut c = proto::dfe_transport_client::DfeTransportClient::new(channel)
119                .max_decoding_message_size(config.max_message_size)
120                .max_encoding_message_size(config.max_message_size);
121
122            if config.compression {
123                c = c
124                    .send_compressed(tonic::codec::CompressionEncoding::Gzip)
125                    .accept_compressed(tonic::codec::CompressionEncoding::Gzip);
126            }
127
128            client = Some(c);
129        }
130
131        // Set up server
132        if let Some(listen) = &config.listen {
133            let addr: std::net::SocketAddr = listen
134                .parse()
135                .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
136
137            let (tx, rx) = mpsc::channel(config.recv_buffer_size);
138            let (sd_tx, sd_rx) = oneshot::channel();
139
140            // DFE native service
141            let dfe_svc = DfeTransportServiceImpl {
142                sender: tx.clone(),
143                sequence: sequence.clone(),
144            };
145
146            let dfe_server = proto::dfe_transport_server::DfeTransportServer::new(dfe_svc)
147                .max_decoding_message_size(config.max_message_size)
148                .max_encoding_message_size(config.max_message_size)
149                .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
150                .send_compressed(tonic::codec::CompressionEncoding::Gzip);
151
152            // Build server with optional Vector compat
153            let mut builder = tonic::transport::Server::builder();
154
155            #[cfg(feature = "transport-grpc-vector-compat")]
156            let router = if config.vector_compat {
157                let vector_svc =
158                    super::vector_compat::source::VectorCompatService::new(tx, sequence.clone());
159                let vector_server =
160                    super::vector_compat::proto::vector::vector_server::VectorServer::new(
161                        vector_svc,
162                    )
163                    .max_decoding_message_size(config.max_message_size)
164                    .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
165                    .send_compressed(tonic::codec::CompressionEncoding::Gzip);
166
167                builder.add_service(dfe_server).add_service(vector_server)
168            } else {
169                builder.add_service(dfe_server)
170            };
171
172            #[cfg(not(feature = "transport-grpc-vector-compat"))]
173            let router = builder.add_service(dfe_server);
174
175            // Bind the listener synchronously BEFORE spawning the serve task.
176            // Once `TcpListener::bind` returns the OS socket is listening and
177            // queues incoming connections, so `new()` returning is a true
178            // readiness signal -- callers (and their tests) can connect
179            // immediately with no polling. `serve_with_shutdown(addr, ..)`
180            // bound inside the spawned task, which made `new()` return before
181            // the socket existed and forced every consumer to poll the port.
182            let listener = tokio::net::TcpListener::bind(addr)
183                .await
184                .map_err(|e| TransportError::Config(format!("failed to bind {addr}: {e}")))?;
185            let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
186
187            let handle = tokio::spawn(async move {
188                router
189                    .serve_with_incoming_shutdown(incoming, async {
190                        sd_rx.await.ok();
191                    })
192                    .await
193            });
194
195            receiver = Some(tokio::sync::Mutex::new(rx));
196            shutdown_tx = Some(sd_tx);
197            server_handle = Some(handle);
198        }
199
200        let healthy = Arc::new(AtomicBool::new(true));
201
202        let filter_engine = super::filter::TransportFilterEngine::new(
203            &config.filters_in,
204            &config.filters_out,
205            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
206        )?;
207
208        #[cfg(feature = "health")]
209        {
210            let h = Arc::clone(&healthy);
211            crate::health::HealthRegistry::register("transport:grpc", move || {
212                if h.load(Ordering::Relaxed) {
213                    crate::health::HealthStatus::Healthy
214                } else {
215                    crate::health::HealthStatus::Unhealthy
216                }
217            });
218        }
219
220        Ok(Self {
221            client,
222            receiver,
223            shutdown_tx,
224            _server_handle: server_handle,
225            closed: AtomicBool::new(false),
226            healthy,
227            recv_timeout_ms: config.recv_timeout_ms,
228            #[cfg(feature = "metrics")]
229            inflight: AtomicU64::new(0),
230            filter_engine,
231            filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
232        })
233    }
234}
235
236impl TransportBase for GrpcTransport {
237    async fn close(&self) -> TransportResult<()> {
238        self.closed.store(true, Ordering::Relaxed);
239        self.healthy.store(false, Ordering::Relaxed);
240
241        // Signal server shutdown
242        // Note: we can't take from Option behind &self, so we use a flag
243        // The server task will complete when the oneshot is dropped
244        Ok(())
245    }
246
247    fn is_healthy(&self) -> bool {
248        let healthy = self.healthy.load(Ordering::Relaxed);
249        #[cfg(feature = "metrics")]
250        metrics::gauge!("dfe_transport_healthy", "transport" => "grpc").set(if healthy {
251            1.0
252        } else {
253            0.0
254        });
255        healthy
256    }
257
258    fn name(&self) -> &'static str {
259        "grpc"
260    }
261}
262
263impl TransportSender for GrpcTransport {
264    async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
265        if self.closed.load(Ordering::Relaxed) {
266            return SendResult::Fatal(TransportError::Closed);
267        }
268
269        // Outbound filter check
270        if self.filter_engine.has_outbound_filters() {
271            match self.filter_engine.apply_outbound(payload) {
272                super::filter::FilterDisposition::Pass => {}
273                super::filter::FilterDisposition::Drop => return SendResult::Ok,
274                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
275            }
276        }
277
278        let Some(client) = &self.client else {
279            return SendResult::Fatal(TransportError::Config(
280                "no endpoint configured for sending".into(),
281            ));
282        };
283
284        let mut metadata = HashMap::new();
285        if !key.is_empty() {
286            metadata.insert("topic".to_string(), key.to_string());
287        }
288
289        // Inject W3C traceparent into gRPC metadata for distributed tracing
290        #[cfg(feature = "transport-trace")]
291        if let Some(tp) = super::propagation::current_traceparent() {
292            metadata.insert(super::propagation::TRACEPARENT_HEADER.to_string(), tp);
293        }
294
295        let request = proto::PushRequest {
296            payload: payload.to_vec(),
297            format: proto::Format::Auto.into(),
298            metadata,
299        };
300
301        #[cfg(feature = "metrics")]
302        let start = std::time::Instant::now();
303
304        #[cfg(feature = "metrics")]
305        self.inflight.fetch_add(1, Ordering::Relaxed);
306
307        // tonic clients are cheaply cloneable (shared channel)
308        let result = match client.clone().push(request).await {
309            Ok(_) => {
310                #[cfg(feature = "metrics")]
311                metrics::counter!("dfe_transport_sent_total", "transport" => "grpc").increment(1);
312                SendResult::Ok
313            }
314            Err(status) => match status.code() {
315                tonic::Code::Unavailable | tonic::Code::ResourceExhausted => {
316                    #[cfg(feature = "metrics")]
317                    metrics::counter!(
318                        "dfe_transport_backpressured_total",
319                        "transport" => "grpc"
320                    )
321                    .increment(1);
322                    SendResult::Backpressured
323                }
324                _ => {
325                    #[cfg(feature = "metrics")]
326                    metrics::counter!(
327                        "dfe_transport_send_errors_total",
328                        "transport" => "grpc"
329                    )
330                    .increment(1);
331                    SendResult::Fatal(TransportError::Send(status.message().to_string()))
332                }
333            },
334        };
335
336        #[cfg(feature = "metrics")]
337        {
338            self.inflight.fetch_sub(1, Ordering::Relaxed);
339            metrics::gauge!("dfe_transport_inflight", "transport" => "grpc")
340                .set(self.inflight.load(Ordering::Relaxed) as f64);
341            metrics::histogram!(
342                "dfe_transport_send_duration_seconds",
343                "transport" => "grpc"
344            )
345            .record(start.elapsed().as_secs_f64());
346        }
347
348        result
349    }
350}
351
352impl TransportReceiver for GrpcTransport {
353    type Token = GrpcToken;
354
355    async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
356        if self.closed.load(Ordering::Relaxed) {
357            return Err(TransportError::Closed);
358        }
359
360        let Some(receiver) = &self.receiver else {
361            return Err(TransportError::Config(
362                "no listen address configured for receiving".into(),
363            ));
364        };
365
366        let mut rx = receiver.lock().await;
367        let mut messages = Vec::with_capacity(max.min(100));
368
369        for _ in 0..max {
370            let result = if self.recv_timeout_ms == 0 {
371                // Non-blocking
372                match rx.try_recv() {
373                    Ok(msg) => Some(msg),
374                    Err(mpsc::error::TryRecvError::Empty) => break,
375                    Err(mpsc::error::TryRecvError::Disconnected) => {
376                        return Err(TransportError::Closed);
377                    }
378                }
379            } else if messages.is_empty() {
380                // First message: wait with timeout
381                match tokio::time::timeout(
382                    std::time::Duration::from_millis(self.recv_timeout_ms),
383                    rx.recv(),
384                )
385                .await
386                {
387                    Ok(Some(msg)) => Some(msg),
388                    Ok(None) => return Err(TransportError::Closed),
389                    Err(_) => break, // Timeout
390                }
391            } else {
392                // Subsequent: non-blocking drain
393                match rx.try_recv() {
394                    Ok(msg) => Some(msg),
395                    Err(_) => break,
396                }
397            };
398
399            if let Some(msg) = result {
400                messages.push(msg);
401            }
402        }
403
404        // Apply inbound filters: drop messages, stage DLQ entries
405        if self.filter_engine.has_inbound_filters() {
406            let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
407            messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
408                super::filter::FilterDisposition::Pass => true,
409                super::filter::FilterDisposition::Drop => false,
410                super::filter::FilterDisposition::Dlq => {
411                    staged_dlq.push(super::filter::FilteredDlqEntry {
412                        payload: msg.payload.clone(),
413                        key: msg.key.clone(),
414                        reason: "transport filter".to_string(),
415                    });
416                    false
417                }
418            });
419            if !staged_dlq.is_empty() {
420                self.filtered_dlq_buffer.lock().extend(staged_dlq);
421            }
422        }
423
424        Ok(messages)
425    }
426
427    fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
428        std::mem::take(&mut *self.filtered_dlq_buffer.lock())
429    }
430
431    async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
432        // gRPC has no broker-side persistence -- commit is a no-op.
433        // Acknowledgement is implicit in the Push RPC response.
434        Ok(())
435    }
436}
437
438impl Drop for GrpcTransport {
439    fn drop(&mut self) {
440        // Take and send shutdown signal
441        if let Some(tx) = self.shutdown_tx.take() {
442            let _ = tx.send(());
443        }
444        // Server handle will be dropped, which aborts the task
445    }
446}
447
448// --- DFE Transport gRPC service implementation ---
449
450/// Internal service implementation that receives Push RPCs
451/// and forwards messages into the transport's mpsc channel.
452struct DfeTransportServiceImpl {
453    sender: mpsc::Sender<Message<GrpcToken>>,
454    sequence: Arc<AtomicU64>,
455}
456
457#[tonic::async_trait]
458impl proto::dfe_transport_server::DfeTransport for DfeTransportServiceImpl {
459    async fn push(
460        &self,
461        request: Request<proto::PushRequest>,
462    ) -> Result<Response<proto::PushResponse>, Status> {
463        let req = request.into_inner();
464        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
465
466        // Extract W3C traceparent from incoming gRPC metadata for distributed tracing
467        #[cfg(feature = "transport-trace")]
468        if let Some(tp) = req.metadata.get(super::propagation::TRACEPARENT_HEADER)
469            && super::propagation::is_valid_traceparent(tp)
470        {
471            tracing::Span::current().record("traceparent", tp.as_str());
472        }
473
474        let format = PayloadFormat::detect(&req.payload);
475        let key = req.metadata.get("topic").map(|s| Arc::from(s.as_str()));
476
477        let msg = Message {
478            key,
479            payload: req.payload,
480            token: GrpcToken::new(seq),
481            timestamp_ms: None,
482            format,
483        };
484
485        match self.sender.try_send(msg) {
486            Ok(()) => {
487                #[cfg(feature = "metrics")]
488                {
489                    metrics::counter!("dfe_transport_sent_total", "transport" => "grpc")
490                        .increment(1);
491                    metrics::gauge!("dfe_transport_queue_size", "transport" => "grpc").set(
492                        self.sender
493                            .max_capacity()
494                            .saturating_sub(self.sender.capacity()) as f64,
495                    );
496                }
497                Ok(Response::new(proto::PushResponse { accepted: 1 }))
498            }
499            Err(mpsc::error::TrySendError::Full(_)) => {
500                #[cfg(feature = "metrics")]
501                metrics::counter!(
502                    "dfe_transport_backpressured_total",
503                    "transport" => "grpc"
504                )
505                .increment(1);
506                Err(Status::resource_exhausted("receiver buffer full"))
507            }
508            Err(mpsc::error::TrySendError::Closed(_)) => {
509                #[cfg(feature = "metrics")]
510                metrics::counter!(
511                    "dfe_transport_refused_total",
512                    "transport" => "grpc"
513                )
514                .increment(1);
515                Err(Status::unavailable("receiver closed"))
516            }
517        }
518    }
519
520    async fn health_check(
521        &self,
522        _request: Request<proto::HealthCheckRequest>,
523    ) -> Result<Response<proto::HealthCheckResponse>, Status> {
524        Ok(Response::new(proto::HealthCheckResponse {
525            status: proto::ServingStatus::Serving.into(),
526        }))
527    }
528}
529
530#[cfg(test)]
531mod tests {
532    use super::*;
533
534    #[test]
535    fn grpc_token_display() {
536        let token = GrpcToken::new(42);
537        assert_eq!(format!("{token}"), "grpc:42");
538
539        let token = GrpcToken::with_source(7, Arc::from("peer-1"));
540        assert_eq!(format!("{token}"), "grpc:peer-1:7");
541    }
542
543    #[test]
544    fn grpc_config_defaults() {
545        let config = GrpcConfig::default();
546        assert!(config.listen.is_none());
547        assert!(config.endpoint.is_none());
548        assert_eq!(config.recv_buffer_size, 10_000);
549        assert_eq!(config.recv_timeout_ms, 100);
550        assert_eq!(config.max_message_size, 16 * 1024 * 1024);
551        assert!(!config.compression);
552    }
553
554    #[test]
555    fn grpc_config_server() {
556        let config = GrpcConfig::server("0.0.0.0:6000");
557        assert_eq!(config.listen.as_deref(), Some("0.0.0.0:6000"));
558        assert!(config.endpoint.is_none());
559    }
560
561    #[test]
562    fn grpc_config_client() {
563        let config = GrpcConfig::client("http://loader:6000");
564        assert!(config.listen.is_none());
565        assert_eq!(config.endpoint.as_deref(), Some("http://loader:6000"));
566    }
567
568    #[test]
569    fn grpc_config_with_compression() {
570        let config = GrpcConfig::server("0.0.0.0:6000").with_compression();
571        assert!(config.compression);
572    }
573
574    #[tokio::test]
575    async fn grpc_transport_client_only() {
576        // Client-only transport (lazy connection, no server)
577        let config = GrpcConfig::client("http://localhost:16000");
578        let transport = GrpcTransport::new(&config).await.unwrap();
579
580        assert!(transport.client.is_some());
581        assert!(transport.receiver.is_none());
582        assert!(transport.is_healthy());
583        assert_eq!(transport.name(), "grpc");
584
585        // recv should error (no server)
586        let result = transport.recv(10).await;
587        assert!(result.is_err());
588
589        // commit is always ok
590        transport.commit(&[]).await.unwrap();
591    }
592
593    #[tokio::test]
594    async fn grpc_transport_server_only() {
595        // Server-only transport (no client for sending)
596        // Note: port 0 may not work with tonic parse, use a specific port
597        let config = GrpcConfig::server("127.0.0.1:16001");
598        let transport = GrpcTransport::new(&config).await.unwrap();
599
600        assert!(transport.client.is_none());
601        assert!(transport.receiver.is_some());
602        assert!(transport.is_healthy());
603
604        // send should error (no client)
605        let result = transport.send("test", b"payload").await;
606        assert!(result.is_fatal());
607
608        // Close
609        transport.close().await.unwrap();
610        assert!(!transport.is_healthy());
611    }
612}