Skip to main content

hyperi_rustlib/transport/grpc/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/grpc/mod.rs
3// Purpose:   gRPC transport backend
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # gRPC Transport
10//!
11//! DFE native gRPC transport using tonic. Supports client mode (sending),
12//! server mode (receiving), or both.
13//!
14//! ## DFE Native Protocol
15//!
16//! Lightweight bulk bytes transfer via `dfe.transport.v1.DfeTransport/Push`.
17//! Payload is opaque bytes (JSON, MsgPack, or Arrow IPC) with a format hint.
18//!
19//! ## Vector Wire Protocol Compatibility (optional)
20//!
21//! When the `transport-grpc-vector-compat` feature is enabled and
22//! `GrpcConfig::vector_compat` is true, the server also accepts
23//! `vector.Vector/PushEvents` RPCs from legacy Vector sinks.
24//!
25//! ## Example
26//!
27//! ```rust,ignore
28//! use hyperi_rustlib::transport::{GrpcTransport, GrpcConfig, TransportReceiver};
29//!
30//! // Server mode (receive from remote senders)
31//! let config = GrpcConfig::server("0.0.0.0:6000");
32//! let transport = GrpcTransport::new(&config).await?;
33//!
34//! let messages = transport.recv(100).await?;
35//! // commit is a no-op for gRPC (no persistence)
36//! transport.commit(&[]).await?;
37//! ```
38
39pub mod config;
40pub mod proto;
41pub mod token;
42
43pub use config::GrpcConfig;
44pub use token::GrpcToken;
45
46use super::error::{TransportError, TransportResult};
47use super::traits::{TransportBase, TransportReceiver, TransportSender};
48use super::types::{Message, PayloadFormat, SendResult};
49use std::collections::HashMap;
50use std::sync::Arc;
51use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
52use tokio::sync::{mpsc, oneshot};
53use tonic::{Request, Response, Status};
54
55/// gRPC transport for DFE inter-service communication.
56///
57/// Implements both `TransportSender` and `TransportReceiver`, so it also
58/// satisfies the unified `Transport` trait via blanket impl.
59pub struct GrpcTransport {
60    /// Client for sending (None if server-only mode).
61    client: Option<proto::dfe_transport_client::DfeTransportClient<tonic::transport::Channel>>,
62
63    /// Receiver channel (None if client-only mode).
64    receiver: Option<tokio::sync::Mutex<mpsc::Receiver<Message<GrpcToken>>>>,
65
66    /// Shutdown signal for the server task.
67    shutdown_tx: Option<oneshot::Sender<()>>,
68
69    /// Server background task handle (kept alive, aborted on drop).
70    _server_handle: Option<tokio::task::JoinHandle<Result<(), tonic::transport::Error>>>,
71
72    /// Whether the transport is closed.
73    closed: AtomicBool,
74
75    /// Shared healthy flag -- read by health registry closure, written by close().
76    healthy: Arc<AtomicBool>,
77
78    /// Receive timeout (milliseconds).
79    recv_timeout_ms: u64,
80
81    /// In-flight send count (for metrics).
82    #[cfg(feature = "metrics")]
83    inflight: AtomicU64,
84
85    /// Transport-level message filter engine.
86    filter_engine: super::filter::TransportFilterEngine,
87
88    /// Buffer for messages staged to DLQ by inbound filters.
89    /// Drained by `take_filtered_dlq_entries()`.
90    filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
91}
92
93impl GrpcTransport {
94    /// Create a new gRPC transport.
95    ///
96    /// # Configuration
97    ///
98    /// - Set `config.listen` to start a gRPC server (receive mode).
99    /// - Set `config.endpoint` to connect to a remote server (send mode).
100    /// - Set both for bidirectional communication.
101    ///
102    /// # Errors
103    ///
104    /// Returns error if the listen address is invalid or the server fails to start.
105    pub async fn new(config: &GrpcConfig) -> TransportResult<Self> {
106        let mut client = None;
107        let mut receiver = None;
108        let mut shutdown_tx = None;
109        let mut server_handle = None;
110        let sequence = Arc::new(AtomicU64::new(0));
111
112        // Set up client (lazy connection -- doesn't fail until first RPC)
113        if let Some(endpoint) = &config.endpoint {
114            let channel = tonic::transport::Channel::from_shared(endpoint.clone())
115                .map_err(|e| TransportError::Config(format!("invalid endpoint: {e}")))?
116                .connect_lazy();
117
118            let mut c = proto::dfe_transport_client::DfeTransportClient::new(channel)
119                .max_decoding_message_size(config.max_message_size)
120                .max_encoding_message_size(config.max_message_size);
121
122            if config.compression {
123                c = c
124                    .send_compressed(tonic::codec::CompressionEncoding::Gzip)
125                    .accept_compressed(tonic::codec::CompressionEncoding::Gzip);
126            }
127
128            client = Some(c);
129        }
130
131        // Set up server
132        if let Some(listen) = &config.listen {
133            let addr: std::net::SocketAddr = listen
134                .parse()
135                .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
136
137            let (tx, rx) = mpsc::channel(config.recv_buffer_size);
138            let (sd_tx, sd_rx) = oneshot::channel();
139
140            // DFE native service
141            let dfe_svc = DfeTransportServiceImpl {
142                sender: tx.clone(),
143                sequence: sequence.clone(),
144            };
145
146            let dfe_server = proto::dfe_transport_server::DfeTransportServer::new(dfe_svc)
147                .max_decoding_message_size(config.max_message_size)
148                .max_encoding_message_size(config.max_message_size)
149                .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
150                .send_compressed(tonic::codec::CompressionEncoding::Gzip);
151
152            // Build server with optional Vector compat
153            let mut builder = tonic::transport::Server::builder();
154
155            #[cfg(feature = "transport-grpc-vector-compat")]
156            let router = if config.vector_compat {
157                let vector_svc =
158                    super::vector_compat::source::VectorCompatService::new(tx, sequence.clone());
159                let vector_server =
160                    super::vector_compat::proto::vector::vector_server::VectorServer::new(
161                        vector_svc,
162                    )
163                    .max_decoding_message_size(config.max_message_size)
164                    .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
165                    .send_compressed(tonic::codec::CompressionEncoding::Gzip);
166
167                builder.add_service(dfe_server).add_service(vector_server)
168            } else {
169                builder.add_service(dfe_server)
170            };
171
172            #[cfg(not(feature = "transport-grpc-vector-compat"))]
173            let router = builder.add_service(dfe_server);
174
175            let handle = tokio::spawn(async move {
176                router
177                    .serve_with_shutdown(addr, async {
178                        sd_rx.await.ok();
179                    })
180                    .await
181            });
182
183            receiver = Some(tokio::sync::Mutex::new(rx));
184            shutdown_tx = Some(sd_tx);
185            server_handle = Some(handle);
186        }
187
188        let healthy = Arc::new(AtomicBool::new(true));
189
190        let filter_engine = super::filter::TransportFilterEngine::new(
191            &config.filters_in,
192            &config.filters_out,
193            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
194        )?;
195
196        #[cfg(feature = "health")]
197        {
198            let h = Arc::clone(&healthy);
199            crate::health::HealthRegistry::register("transport:grpc", move || {
200                if h.load(Ordering::Relaxed) {
201                    crate::health::HealthStatus::Healthy
202                } else {
203                    crate::health::HealthStatus::Unhealthy
204                }
205            });
206        }
207
208        Ok(Self {
209            client,
210            receiver,
211            shutdown_tx,
212            _server_handle: server_handle,
213            closed: AtomicBool::new(false),
214            healthy,
215            recv_timeout_ms: config.recv_timeout_ms,
216            #[cfg(feature = "metrics")]
217            inflight: AtomicU64::new(0),
218            filter_engine,
219            filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
220        })
221    }
222}
223
224impl TransportBase for GrpcTransport {
225    async fn close(&self) -> TransportResult<()> {
226        self.closed.store(true, Ordering::Relaxed);
227        self.healthy.store(false, Ordering::Relaxed);
228
229        // Signal server shutdown
230        // Note: we can't take from Option behind &self, so we use a flag
231        // The server task will complete when the oneshot is dropped
232        Ok(())
233    }
234
235    fn is_healthy(&self) -> bool {
236        let healthy = self.healthy.load(Ordering::Relaxed);
237        #[cfg(feature = "metrics")]
238        metrics::gauge!("dfe_transport_healthy", "transport" => "grpc").set(if healthy {
239            1.0
240        } else {
241            0.0
242        });
243        healthy
244    }
245
246    fn name(&self) -> &'static str {
247        "grpc"
248    }
249}
250
251impl TransportSender for GrpcTransport {
252    async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
253        if self.closed.load(Ordering::Relaxed) {
254            return SendResult::Fatal(TransportError::Closed);
255        }
256
257        // Outbound filter check
258        if self.filter_engine.has_outbound_filters() {
259            match self.filter_engine.apply_outbound(payload) {
260                super::filter::FilterDisposition::Pass => {}
261                super::filter::FilterDisposition::Drop => return SendResult::Ok,
262                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
263            }
264        }
265
266        let Some(client) = &self.client else {
267            return SendResult::Fatal(TransportError::Config(
268                "no endpoint configured for sending".into(),
269            ));
270        };
271
272        let mut metadata = HashMap::new();
273        if !key.is_empty() {
274            metadata.insert("topic".to_string(), key.to_string());
275        }
276
277        // Inject W3C traceparent into gRPC metadata for distributed tracing
278        #[cfg(feature = "transport-trace")]
279        if let Some(tp) = super::propagation::current_traceparent() {
280            metadata.insert(super::propagation::TRACEPARENT_HEADER.to_string(), tp);
281        }
282
283        let request = proto::PushRequest {
284            payload: payload.to_vec(),
285            format: proto::Format::Auto.into(),
286            metadata,
287        };
288
289        #[cfg(feature = "metrics")]
290        let start = std::time::Instant::now();
291
292        #[cfg(feature = "metrics")]
293        self.inflight.fetch_add(1, Ordering::Relaxed);
294
295        // tonic clients are cheaply cloneable (shared channel)
296        let result = match client.clone().push(request).await {
297            Ok(_) => {
298                #[cfg(feature = "metrics")]
299                metrics::counter!("dfe_transport_sent_total", "transport" => "grpc").increment(1);
300                SendResult::Ok
301            }
302            Err(status) => match status.code() {
303                tonic::Code::Unavailable | tonic::Code::ResourceExhausted => {
304                    #[cfg(feature = "metrics")]
305                    metrics::counter!(
306                        "dfe_transport_backpressured_total",
307                        "transport" => "grpc"
308                    )
309                    .increment(1);
310                    SendResult::Backpressured
311                }
312                _ => {
313                    #[cfg(feature = "metrics")]
314                    metrics::counter!(
315                        "dfe_transport_send_errors_total",
316                        "transport" => "grpc"
317                    )
318                    .increment(1);
319                    SendResult::Fatal(TransportError::Send(status.message().to_string()))
320                }
321            },
322        };
323
324        #[cfg(feature = "metrics")]
325        {
326            self.inflight.fetch_sub(1, Ordering::Relaxed);
327            metrics::gauge!("dfe_transport_inflight", "transport" => "grpc")
328                .set(self.inflight.load(Ordering::Relaxed) as f64);
329            metrics::histogram!(
330                "dfe_transport_send_duration_seconds",
331                "transport" => "grpc"
332            )
333            .record(start.elapsed().as_secs_f64());
334        }
335
336        result
337    }
338}
339
340impl TransportReceiver for GrpcTransport {
341    type Token = GrpcToken;
342
343    async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
344        if self.closed.load(Ordering::Relaxed) {
345            return Err(TransportError::Closed);
346        }
347
348        let Some(receiver) = &self.receiver else {
349            return Err(TransportError::Config(
350                "no listen address configured for receiving".into(),
351            ));
352        };
353
354        let mut rx = receiver.lock().await;
355        let mut messages = Vec::with_capacity(max.min(100));
356
357        for _ in 0..max {
358            let result = if self.recv_timeout_ms == 0 {
359                // Non-blocking
360                match rx.try_recv() {
361                    Ok(msg) => Some(msg),
362                    Err(mpsc::error::TryRecvError::Empty) => break,
363                    Err(mpsc::error::TryRecvError::Disconnected) => {
364                        return Err(TransportError::Closed);
365                    }
366                }
367            } else if messages.is_empty() {
368                // First message: wait with timeout
369                match tokio::time::timeout(
370                    std::time::Duration::from_millis(self.recv_timeout_ms),
371                    rx.recv(),
372                )
373                .await
374                {
375                    Ok(Some(msg)) => Some(msg),
376                    Ok(None) => return Err(TransportError::Closed),
377                    Err(_) => break, // Timeout
378                }
379            } else {
380                // Subsequent: non-blocking drain
381                match rx.try_recv() {
382                    Ok(msg) => Some(msg),
383                    Err(_) => break,
384                }
385            };
386
387            if let Some(msg) = result {
388                messages.push(msg);
389            }
390        }
391
392        // Apply inbound filters: drop messages, stage DLQ entries
393        if self.filter_engine.has_inbound_filters() {
394            let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
395            messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
396                super::filter::FilterDisposition::Pass => true,
397                super::filter::FilterDisposition::Drop => false,
398                super::filter::FilterDisposition::Dlq => {
399                    staged_dlq.push(super::filter::FilteredDlqEntry {
400                        payload: msg.payload.clone(),
401                        key: msg.key.clone(),
402                        reason: "transport filter".to_string(),
403                    });
404                    false
405                }
406            });
407            if !staged_dlq.is_empty() {
408                self.filtered_dlq_buffer.lock().extend(staged_dlq);
409            }
410        }
411
412        Ok(messages)
413    }
414
415    fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
416        std::mem::take(&mut *self.filtered_dlq_buffer.lock())
417    }
418
419    async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
420        // gRPC has no broker-side persistence -- commit is a no-op.
421        // Acknowledgement is implicit in the Push RPC response.
422        Ok(())
423    }
424}
425
426impl Drop for GrpcTransport {
427    fn drop(&mut self) {
428        // Take and send shutdown signal
429        if let Some(tx) = self.shutdown_tx.take() {
430            let _ = tx.send(());
431        }
432        // Server handle will be dropped, which aborts the task
433    }
434}
435
436// --- DFE Transport gRPC service implementation ---
437
438/// Internal service implementation that receives Push RPCs
439/// and forwards messages into the transport's mpsc channel.
440struct DfeTransportServiceImpl {
441    sender: mpsc::Sender<Message<GrpcToken>>,
442    sequence: Arc<AtomicU64>,
443}
444
445#[tonic::async_trait]
446impl proto::dfe_transport_server::DfeTransport for DfeTransportServiceImpl {
447    async fn push(
448        &self,
449        request: Request<proto::PushRequest>,
450    ) -> Result<Response<proto::PushResponse>, Status> {
451        let req = request.into_inner();
452        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
453
454        // Extract W3C traceparent from incoming gRPC metadata for distributed tracing
455        #[cfg(feature = "transport-trace")]
456        if let Some(tp) = req.metadata.get(super::propagation::TRACEPARENT_HEADER)
457            && super::propagation::is_valid_traceparent(tp)
458        {
459            tracing::Span::current().record("traceparent", tp.as_str());
460        }
461
462        let format = PayloadFormat::detect(&req.payload);
463        let key = req.metadata.get("topic").map(|s| Arc::from(s.as_str()));
464
465        let msg = Message {
466            key,
467            payload: req.payload,
468            token: GrpcToken::new(seq),
469            timestamp_ms: None,
470            format,
471        };
472
473        match self.sender.try_send(msg) {
474            Ok(()) => {
475                #[cfg(feature = "metrics")]
476                {
477                    metrics::counter!("dfe_transport_sent_total", "transport" => "grpc")
478                        .increment(1);
479                    metrics::gauge!("dfe_transport_queue_size", "transport" => "grpc").set(
480                        self.sender
481                            .max_capacity()
482                            .saturating_sub(self.sender.capacity()) as f64,
483                    );
484                }
485                Ok(Response::new(proto::PushResponse { accepted: 1 }))
486            }
487            Err(mpsc::error::TrySendError::Full(_)) => {
488                #[cfg(feature = "metrics")]
489                metrics::counter!(
490                    "dfe_transport_backpressured_total",
491                    "transport" => "grpc"
492                )
493                .increment(1);
494                Err(Status::resource_exhausted("receiver buffer full"))
495            }
496            Err(mpsc::error::TrySendError::Closed(_)) => {
497                #[cfg(feature = "metrics")]
498                metrics::counter!(
499                    "dfe_transport_refused_total",
500                    "transport" => "grpc"
501                )
502                .increment(1);
503                Err(Status::unavailable("receiver closed"))
504            }
505        }
506    }
507
508    async fn health_check(
509        &self,
510        _request: Request<proto::HealthCheckRequest>,
511    ) -> Result<Response<proto::HealthCheckResponse>, Status> {
512        Ok(Response::new(proto::HealthCheckResponse {
513            status: proto::ServingStatus::Serving.into(),
514        }))
515    }
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521
522    #[test]
523    fn grpc_token_display() {
524        let token = GrpcToken::new(42);
525        assert_eq!(format!("{token}"), "grpc:42");
526
527        let token = GrpcToken::with_source(7, Arc::from("peer-1"));
528        assert_eq!(format!("{token}"), "grpc:peer-1:7");
529    }
530
531    #[test]
532    fn grpc_config_defaults() {
533        let config = GrpcConfig::default();
534        assert!(config.listen.is_none());
535        assert!(config.endpoint.is_none());
536        assert_eq!(config.recv_buffer_size, 10_000);
537        assert_eq!(config.recv_timeout_ms, 100);
538        assert_eq!(config.max_message_size, 16 * 1024 * 1024);
539        assert!(!config.compression);
540    }
541
542    #[test]
543    fn grpc_config_server() {
544        let config = GrpcConfig::server("0.0.0.0:6000");
545        assert_eq!(config.listen.as_deref(), Some("0.0.0.0:6000"));
546        assert!(config.endpoint.is_none());
547    }
548
549    #[test]
550    fn grpc_config_client() {
551        let config = GrpcConfig::client("http://loader:6000");
552        assert!(config.listen.is_none());
553        assert_eq!(config.endpoint.as_deref(), Some("http://loader:6000"));
554    }
555
556    #[test]
557    fn grpc_config_with_compression() {
558        let config = GrpcConfig::server("0.0.0.0:6000").with_compression();
559        assert!(config.compression);
560    }
561
562    #[tokio::test]
563    async fn grpc_transport_client_only() {
564        // Client-only transport (lazy connection, no server)
565        let config = GrpcConfig::client("http://localhost:16000");
566        let transport = GrpcTransport::new(&config).await.unwrap();
567
568        assert!(transport.client.is_some());
569        assert!(transport.receiver.is_none());
570        assert!(transport.is_healthy());
571        assert_eq!(transport.name(), "grpc");
572
573        // recv should error (no server)
574        let result = transport.recv(10).await;
575        assert!(result.is_err());
576
577        // commit is always ok
578        transport.commit(&[]).await.unwrap();
579    }
580
581    #[tokio::test]
582    async fn grpc_transport_server_only() {
583        // Server-only transport (no client for sending)
584        // Note: port 0 may not work with tonic parse, use a specific port
585        let config = GrpcConfig::server("127.0.0.1:16001");
586        let transport = GrpcTransport::new(&config).await.unwrap();
587
588        assert!(transport.client.is_none());
589        assert!(transport.receiver.is_some());
590        assert!(transport.is_healthy());
591
592        // send should error (no client)
593        let result = transport.send("test", b"payload").await;
594        assert!(result.is_fatal());
595
596        // Close
597        transport.close().await.unwrap();
598        assert!(!transport.is_healthy());
599    }
600}