Skip to main content

hyperi_rustlib/transport/grpc/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/grpc/mod.rs
3// Purpose:   gRPC transport backend
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # gRPC Transport
10//!
11//! DFE native gRPC transport using tonic. Supports client mode (sending),
12//! server mode (receiving), or both.
13//!
14//! ## DFE Native Protocol
15//!
16//! Lightweight bulk bytes transfer via `dfe.transport.v1.DfeTransport/Push`.
17//! Payload is opaque bytes (JSON, MsgPack, or Arrow IPC) with a format hint.
18//!
19//! ## Vector Wire Protocol Compatibility (optional)
20//!
21//! When the `transport-grpc-vector-compat` feature is enabled and
22//! `GrpcConfig::vector_compat` is true, the server also accepts
23//! `vector.Vector/PushEvents` RPCs from legacy Vector sinks.
24//!
25//! ## Example
26//!
27//! ```rust,ignore
28//! use hyperi_rustlib::transport::{GrpcTransport, GrpcConfig, TransportReceiver};
29//!
30//! // Server mode (receive from remote senders)
31//! let config = GrpcConfig::server("0.0.0.0:6000");
32//! let transport = GrpcTransport::new(&config).await?;
33//!
34//! let records = transport.recv(100).await?.records;
35//! // commit is a no-op for gRPC (no persistence)
36//! transport.commit(&[]).await?;
37//! ```
38
39pub mod batch;
40pub mod config;
41pub mod proto;
42pub mod token;
43
44pub use config::GrpcConfig;
45pub use token::GrpcToken;
46
47use super::error::{TransportError, TransportResult};
48use super::traits::{RecvBatch, TransportBase, TransportReceiver, TransportSender};
49use super::types::{Message, PayloadFormat, SendResult};
50use super::work_batch::{Record, WorkBatch};
51use std::collections::HashMap;
52use std::sync::Arc;
53use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
54use tokio::sync::{mpsc, oneshot};
55use tonic::{Request, Response, Status};
56
57/// gRPC transport for DFE inter-service communication.
58///
59/// Implements both `TransportSender` and `TransportReceiver`, so it also
60/// satisfies the unified `Transport` trait via blanket impl.
61pub struct GrpcTransport {
62    /// Client for sending (None if server-only mode).
63    client: Option<proto::dfe_transport_client::DfeTransportClient<tonic::transport::Channel>>,
64
65    /// Receiver channel (None if client-only mode).
66    receiver: Option<tokio::sync::Mutex<mpsc::Receiver<Message<GrpcToken>>>>,
67
68    /// Shutdown signal for the server task. Behind a `Mutex<Option<..>>` so
69    /// `close(&self)` (not just `Drop`) can take and fire it.
70    shutdown_tx: parking_lot::Mutex<Option<oneshot::Sender<()>>>,
71
72    /// Server background task handle (kept alive, aborted on drop).
73    _server_handle: Option<tokio::task::JoinHandle<Result<(), tonic::transport::Error>>>,
74
75    /// Whether the transport is closed.
76    closed: AtomicBool,
77
78    /// Shared healthy flag -- read by health registry closure, written by close().
79    healthy: Arc<AtomicBool>,
80
81    /// Receive timeout (milliseconds).
82    recv_timeout_ms: u64,
83
84    /// Per-RPC send deadline (milliseconds, 0 = none).
85    send_timeout_ms: u64,
86
87    /// tonic's max encoded message size. A batch over this fails the RPC with
88    /// an unretryable `OutOfRange`; we pre-check against it to reject early with
89    /// an actionable error instead.
90    max_message_size: usize,
91
92    /// In-flight send count (for metrics).
93    #[cfg(feature = "metrics")]
94    inflight: AtomicU64,
95
96    /// Transport-level message filter engine.
97    filter_engine: super::filter::TransportFilterEngine,
98}
99
100/// Build a tonic `ClientTlsConfig` from the unified TLS fields on `GrpcConfig`.
101///
102/// tonic owns its TLS stack (like librdkafka), so map `TlsTrust` onto
103/// `ClientTlsConfig`: private-CA PEM (else OS native roots), optional SNI
104/// override, optional mTLS client identity.
105fn build_grpc_client_tls(
106    config: &GrpcConfig,
107) -> TransportResult<tonic::transport::ClientTlsConfig> {
108    use tonic::transport::{Certificate, ClientTlsConfig, Identity};
109
110    let mut tls = ClientTlsConfig::new();
111
112    if let Some(ref ca) = config.tls_ca_path {
113        let pem = std::fs::read(ca)
114            .map_err(|e| TransportError::Config(format!("gRPC TLS: cannot read ca {ca}: {e}")))?;
115        tls = tls.ca_certificate(Certificate::from_pem(pem));
116    } else {
117        // No private CA -> trust the OS native roots.
118        tls = tls.with_native_roots();
119    }
120
121    if let Some(ref domain) = config.tls_domain {
122        tls = tls.domain_name(domain.clone());
123    }
124
125    // mTLS identity -- both cert and key, or neither.
126    match (&config.tls_client_cert_path, &config.tls_client_key_path) {
127        (Some(cert), Some(key)) => {
128            let cert_pem = std::fs::read(cert).map_err(|e| {
129                TransportError::Config(format!("gRPC TLS: cannot read client cert {cert}: {e}"))
130            })?;
131            let key_pem = std::fs::read(key).map_err(|e| {
132                TransportError::Config(format!("gRPC TLS: cannot read client key {key}: {e}"))
133            })?;
134            tls = tls.identity(Identity::from_pem(cert_pem, key_pem));
135        }
136        (None, None) => {}
137        _ => {
138            return Err(TransportError::Config(
139                "gRPC TLS: mTLS requires BOTH tls_client_cert_path and tls_client_key_path"
140                    .to_string(),
141            ));
142        }
143    }
144
145    Ok(tls)
146}
147
148impl GrpcTransport {
149    /// Create a new gRPC transport.
150    ///
151    /// # Configuration
152    ///
153    /// - Set `config.listen` to start a gRPC server (receive mode).
154    /// - Set `config.endpoint` to connect to a remote server (send mode).
155    /// - Set both for bidirectional communication.
156    ///
157    /// # Errors
158    ///
159    /// Returns error if the listen address is invalid or the server fails to start.
160    pub async fn new(config: &GrpcConfig) -> TransportResult<Self> {
161        Self::new_inner(
162            config,
163            #[cfg(feature = "governor")]
164            None,
165        )
166        .await
167    }
168
169    /// Create a gRPC transport bound to a pressure governor (G3, `governor`
170    /// feature).
171    ///
172    /// Like [`new`](Self::new), but the receive server consults `pressure`
173    /// before enqueuing each inbound Push / batch record: while
174    /// [`UnifiedPressure::should_hold`](crate::governor::UnifiedPressure::should_hold)
175    /// holds, the RPC is rejected with `Status::unavailable` (the gRPC analogue
176    /// of HTTP 503). `None` is equivalent to [`new`](Self::new).
177    ///
178    /// # Errors
179    ///
180    /// Same as [`new`](Self::new).
181    #[cfg(feature = "governor")]
182    pub async fn with_pressure(
183        config: &GrpcConfig,
184        pressure: Option<Arc<crate::governor::UnifiedPressure>>,
185    ) -> TransportResult<Self> {
186        Self::new_inner(config, pressure).await
187    }
188
189    async fn new_inner(
190        config: &GrpcConfig,
191        #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
192    ) -> TransportResult<Self> {
193        let mut client = None;
194        let mut receiver = None;
195        let mut shutdown_tx = None;
196        let mut server_handle = None;
197        let sequence = Arc::new(AtomicU64::new(0));
198
199        // Set up client (lazy connection -- doesn't fail until first RPC)
200        if let Some(endpoint) = &config.endpoint {
201            let mut ep = tonic::transport::Channel::from_shared(endpoint.clone())
202                .map_err(|e| TransportError::Config(format!("invalid endpoint: {e}")))?;
203
204            // Client TLS. tonic owns its TLS stack, so we map the unified
205            // vocabulary onto ClientTlsConfig (private CA, mTLS identity, SNI).
206            if config.tls_enabled {
207                ep = ep
208                    .tls_config(build_grpc_client_tls(config)?)
209                    .map_err(|e| TransportError::Config(format!("gRPC TLS config: {e}")))?;
210            }
211
212            let channel = ep.connect_lazy();
213
214            let mut c = proto::dfe_transport_client::DfeTransportClient::new(channel)
215                .max_decoding_message_size(config.max_message_size)
216                .max_encoding_message_size(config.max_message_size);
217
218            if config.compression {
219                c = c
220                    .send_compressed(tonic::codec::CompressionEncoding::Gzip)
221                    .accept_compressed(tonic::codec::CompressionEncoding::Gzip);
222            }
223
224            client = Some(c);
225        }
226
227        // Set up server
228        if let Some(listen) = &config.listen {
229            let addr: std::net::SocketAddr = listen
230                .parse()
231                .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
232
233            let (tx, rx) = mpsc::channel(config.recv_buffer_size);
234            let (sd_tx, sd_rx) = oneshot::channel();
235
236            // DFE native service
237            let dfe_svc = DfeTransportServiceImpl {
238                sender: tx.clone(),
239                sequence: sequence.clone(),
240                #[cfg(feature = "metrics")]
241                server_inflight: Arc::new(AtomicU64::new(0)),
242                #[cfg(feature = "governor")]
243                pressure: pressure.clone(),
244            };
245
246            let dfe_server = proto::dfe_transport_server::DfeTransportServer::new(dfe_svc)
247                .max_decoding_message_size(config.max_message_size)
248                .max_encoding_message_size(config.max_message_size)
249                .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
250                .send_compressed(tonic::codec::CompressionEncoding::Gzip);
251
252            // Build server with optional Vector compat
253            let mut builder = tonic::transport::Server::builder();
254
255            #[cfg(feature = "transport-grpc-vector-compat")]
256            let router = if config.vector_compat {
257                let vector_svc =
258                    super::vector_compat::source::VectorCompatService::new(tx, sequence.clone());
259                let vector_server =
260                    super::vector_compat::proto::vector::vector_server::VectorServer::new(
261                        vector_svc,
262                    )
263                    .max_decoding_message_size(config.max_message_size)
264                    .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
265                    .send_compressed(tonic::codec::CompressionEncoding::Gzip);
266
267                builder.add_service(dfe_server).add_service(vector_server)
268            } else {
269                builder.add_service(dfe_server)
270            };
271
272            #[cfg(not(feature = "transport-grpc-vector-compat"))]
273            let router = builder.add_service(dfe_server);
274
275            // Bind the listener synchronously BEFORE spawning the serve task,
276            // so `new()` returning is a true readiness signal -- callers connect
277            // immediately, no polling. Binding inside the spawned task let
278            // `new()` return before the socket existed.
279            let listener = tokio::net::TcpListener::bind(addr)
280                .await
281                .map_err(|e| TransportError::Config(format!("failed to bind {addr}: {e}")))?;
282            let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
283
284            let handle = tokio::spawn(async move {
285                router
286                    .serve_with_incoming_shutdown(incoming, async {
287                        sd_rx.await.ok();
288                    })
289                    .await
290            });
291
292            receiver = Some(tokio::sync::Mutex::new(rx));
293            shutdown_tx = Some(sd_tx);
294            server_handle = Some(handle);
295        } else {
296            // No receive server -> nothing to attach the governor to. Consume
297            // it to silence the unused-variable warning.
298            #[cfg(feature = "governor")]
299            let _ = pressure;
300        }
301
302        let healthy = Arc::new(AtomicBool::new(true));
303
304        let filter_engine = super::filter::TransportFilterEngine::new(
305            &config.filters_in,
306            &config.filters_out,
307            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
308        )?;
309
310        #[cfg(feature = "health")]
311        {
312            let h = Arc::clone(&healthy);
313            crate::health::HealthRegistry::register("transport:grpc", move || {
314                if h.load(Ordering::Relaxed) {
315                    crate::health::HealthStatus::Healthy
316                } else {
317                    crate::health::HealthStatus::Unhealthy
318                }
319            });
320        }
321
322        Ok(Self {
323            client,
324            receiver,
325            shutdown_tx: parking_lot::Mutex::new(shutdown_tx),
326            _server_handle: server_handle,
327            closed: AtomicBool::new(false),
328            healthy,
329            recv_timeout_ms: config.recv_timeout_ms,
330            send_timeout_ms: config.send_timeout_ms,
331            max_message_size: config.max_message_size,
332            #[cfg(feature = "metrics")]
333            inflight: AtomicU64::new(0),
334            filter_engine,
335        })
336    }
337}
338
339impl TransportSender for GrpcTransport {
340    async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
341        if self.closed.load(Ordering::Relaxed) {
342            return SendResult::Fatal(TransportError::Closed);
343        }
344
345        // Outbound filter check
346        if self.filter_engine.has_outbound_filters() {
347            match self.filter_engine.apply_outbound(&payload) {
348                super::filter::FilterDisposition::Pass => {}
349                super::filter::FilterDisposition::Drop => return SendResult::Ok,
350                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
351            }
352        }
353
354        let Some(client) = &self.client else {
355            return SendResult::Fatal(TransportError::Config(
356                "no endpoint configured for sending".into(),
357            ));
358        };
359
360        let mut metadata = HashMap::new();
361        if !key.is_empty() {
362            metadata.insert("topic".to_string(), key.to_string());
363        }
364
365        // Inject W3C traceparent into gRPC metadata.
366        #[cfg(feature = "transport-trace")]
367        if let Some(tp) = super::propagation::current_traceparent() {
368            metadata.insert(super::propagation::TRACEPARENT_HEADER.to_string(), tp);
369        }
370
371        let mut request = tonic::Request::new(proto::PushRequest {
372            // proto field is `Bytes` (`.bytes(".")` in build.rs) -- move, no copy.
373            payload,
374            format: proto::Format::Auto.into(),
375            metadata,
376        });
377
378        // Bound the RPC so a hung/black-holing server cannot wedge the sender
379        // task forever. Sent as the grpc-timeout header.
380        if self.send_timeout_ms > 0 {
381            request.set_timeout(std::time::Duration::from_millis(self.send_timeout_ms));
382        }
383
384        #[cfg(feature = "metrics")]
385        let start = std::time::Instant::now();
386
387        #[cfg(feature = "metrics")]
388        self.inflight.fetch_add(1, Ordering::Relaxed);
389
390        // tonic clients are cheaply cloneable (shared channel)
391        let result = match client.clone().push(request).await {
392            Ok(_) => {
393                #[cfg(feature = "metrics")]
394                metrics::counter!("dfe_transport_sent_total", "transport" => "grpc").increment(1);
395                SendResult::Ok
396            }
397            Err(status) => match status.code() {
398                // Transient -- backpressure so the caller retries, not drop.
399                // DeadlineExceeded = send_timeout_ms fired (slow/hung server).
400                tonic::Code::Unavailable
401                | tonic::Code::ResourceExhausted
402                | tonic::Code::DeadlineExceeded => {
403                    #[cfg(feature = "metrics")]
404                    metrics::counter!(
405                        "dfe_transport_backpressured_total",
406                        "transport" => "grpc"
407                    )
408                    .increment(1);
409                    SendResult::Backpressured
410                }
411                _ => {
412                    #[cfg(feature = "metrics")]
413                    metrics::counter!(
414                        "dfe_transport_send_errors_total",
415                        "transport" => "grpc"
416                    )
417                    .increment(1);
418                    SendResult::Fatal(TransportError::Send(status.message().to_string()))
419                }
420            },
421        };
422
423        #[cfg(feature = "metrics")]
424        {
425            self.inflight.fetch_sub(1, Ordering::Relaxed);
426            metrics::gauge!("dfe_transport_inflight", "transport" => "grpc")
427                .set(self.inflight.load(Ordering::Relaxed) as f64);
428            metrics::histogram!(
429                "dfe_transport_send_duration_seconds",
430                "transport" => "grpc"
431            )
432            .record(start.elapsed().as_secs_f64());
433        }
434
435        result
436    }
437
438    /// Send a whole batch of records in ONE `RouteBatch` RPC (Task 0.6).
439    ///
440    /// Native batch override of [`TransportSender::send_batch`]: serde-less
441    /// rustlib<->rustlib transfer. Records map to a proto
442    /// [`Batch`](proto::Batch) via [`batch::records_to_proto`]; payloads travel
443    /// as OPAQUE `bytes`, the JSON / MsgPack codec is NEVER invoked in transit.
444    ///
445    /// Commit tokens and inline-DLQ entries are NOT sent -- the SENDER's local
446    /// concern. Pass the records (e.g. `&workbatch.records`); the caller fires
447    /// its commit tokens locally after this returns `Ok`.
448    ///
449    /// ## Atomic (all-or-nothing) acceptance
450    ///
451    /// The server reserves channel capacity for the WHOLE batch (one
452    /// `try_reserve_many`) before enqueuing any record -- no partial-send
453    /// window. `Backpressured` means ZERO records were admitted, so the caller
454    /// retries the whole block (at-least-once) with no duplicate prefix.
455    ///
456    /// # Errors / result
457    ///
458    /// Returns a [`SendResult`]. `Backpressured` maps the same transient gRPC
459    /// codes as [`send`](TransportSender::send).
460    async fn send_batch(&self, records: &[Record]) -> SendResult {
461        if self.closed.load(Ordering::Relaxed) {
462            return SendResult::Fatal(TransportError::Closed);
463        }
464
465        let Some(client) = &self.client else {
466            return SendResult::Fatal(TransportError::Config(
467                "no endpoint configured for sending".into(),
468            ));
469        };
470
471        // Apply outbound filters BEFORE the wire: a record matched by a `drop`
472        // or `dlq` filter must NOT be transmitted. This path once bypassed the
473        // filter entirely -- a record told to drop sailed through.
474        let to_send: Vec<Record> = if self.filter_engine.has_outbound_filters() {
475            let mut keep = Vec::with_capacity(records.len());
476            for r in records {
477                match self.filter_engine.apply_outbound(&r.payload) {
478                    super::filter::FilterDisposition::Pass => keep.push(r.clone()),
479                    super::filter::FilterDisposition::Drop
480                    | super::filter::FilterDisposition::Dlq => {}
481                }
482            }
483            keep
484        } else {
485            records.to_vec()
486        };
487
488        // Everything was filtered out -- nothing to send, batch is "done".
489        if to_send.is_empty() {
490            return SendResult::Ok;
491        }
492        let sent_count = to_send.len();
493
494        // Reject an oversized block early with an actionable error. Over
495        // max_message_size, tonic fails with an opaque OutOfRange that maps to
496        // Fatal and can never succeed on retry; the fix is a smaller block, so
497        // name the limit and point at the byte-budget lever. Payload-only bound
498        // (proto framing adds a little); tonic still backstops the margin.
499        let payload_bytes: usize = to_send.iter().map(|r| r.payload.len()).sum();
500        if payload_bytes > self.max_message_size {
501            #[cfg(feature = "metrics")]
502            metrics::counter!("dfe_transport_oversize_total", "transport" => "grpc").increment(1);
503            return SendResult::Fatal(TransportError::Config(format!(
504                "gRPC batch payload {payload_bytes} bytes exceeds max_message_size \
505                 {} -- lower the self-regulation byte budget below the gRPC limit",
506                self.max_message_size
507            )));
508        }
509
510        // Map records -> proto Batch. Payloads are MOVED (Bytes handle), opaque.
511        let proto_batch = batch::records_to_proto(to_send);
512
513        let mut request = tonic::Request::new(proto_batch);
514
515        // Inject W3C traceparent into gRPC metadata.
516        #[cfg(feature = "transport-trace")]
517        if let Some(tp) = super::propagation::current_traceparent()
518            && let Ok(val) = tp.parse()
519        {
520            request
521                .metadata_mut()
522                .insert(super::propagation::TRACEPARENT_HEADER, val);
523        }
524
525        if self.send_timeout_ms > 0 {
526            request.set_timeout(std::time::Duration::from_millis(self.send_timeout_ms));
527        }
528
529        #[cfg(feature = "metrics")]
530        let start = std::time::Instant::now();
531        #[cfg(feature = "metrics")]
532        self.inflight.fetch_add(1, Ordering::Relaxed);
533
534        let result = match client.clone().route_batch(request).await {
535            Ok(response) => {
536                // Server is all-or-nothing today, but the proto permits partial
537                // acceptance. Treating ANY Ok as full success would fire every
538                // commit token while the receiver kept only a prefix -- silent
539                // loss. Require accepted == sent; a shortfall is retryable
540                // (at-least-once).
541                let accepted = response.into_inner().accepted;
542                if accepted < sent_count as u64 {
543                    #[cfg(feature = "metrics")]
544                    metrics::counter!(
545                        "dfe_transport_backpressured_total",
546                        "transport" => "grpc"
547                    )
548                    .increment(1);
549                    tracing::warn!(
550                        accepted,
551                        sent = sent_count,
552                        "gRPC RouteBatch partially accepted -- retrying whole block"
553                    );
554                    SendResult::Backpressured
555                } else {
556                    #[cfg(feature = "metrics")]
557                    metrics::counter!(
558                        "dfe_transport_sent_total",
559                        "transport" => "grpc",
560                        "path" => "batch"
561                    )
562                    .increment(sent_count as u64);
563                    SendResult::Ok
564                }
565            }
566            Err(status) => match status.code() {
567                tonic::Code::Unavailable
568                | tonic::Code::ResourceExhausted
569                | tonic::Code::DeadlineExceeded => {
570                    #[cfg(feature = "metrics")]
571                    metrics::counter!(
572                        "dfe_transport_backpressured_total",
573                        "transport" => "grpc"
574                    )
575                    .increment(1);
576                    SendResult::Backpressured
577                }
578                _ => {
579                    #[cfg(feature = "metrics")]
580                    metrics::counter!(
581                        "dfe_transport_send_errors_total",
582                        "transport" => "grpc"
583                    )
584                    .increment(1);
585                    SendResult::Fatal(TransportError::Send(status.message().to_string()))
586                }
587            },
588        };
589
590        #[cfg(feature = "metrics")]
591        {
592            self.inflight.fetch_sub(1, Ordering::Relaxed);
593            metrics::histogram!(
594                "dfe_transport_send_duration_seconds",
595                "transport" => "grpc"
596            )
597            .record(start.elapsed().as_secs_f64());
598        }
599
600        result
601    }
602}
603
604impl TransportBase for GrpcTransport {
605    async fn close(&self) -> TransportResult<()> {
606        self.closed.store(true, Ordering::Relaxed);
607        self.healthy.store(false, Ordering::Relaxed);
608
609        // Actually stop the server: fire the shutdown oneshot so
610        // serve_with_incoming_shutdown completes and the listener is freed.
611        // Idempotent -- a second close() (or Drop) finds None.
612        if let Some(tx) = self.shutdown_tx.lock().take() {
613            let _ = tx.send(());
614        }
615        Ok(())
616    }
617
618    fn is_healthy(&self) -> bool {
619        let healthy = self.healthy.load(Ordering::Relaxed);
620        #[cfg(feature = "metrics")]
621        metrics::gauge!("dfe_transport_healthy", "transport" => "grpc").set(if healthy {
622            1.0
623        } else {
624            0.0
625        });
626        healthy
627    }
628
629    fn name(&self) -> &'static str {
630        "grpc"
631    }
632}
633
634impl TransportReceiver for GrpcTransport {
635    type Token = GrpcToken;
636
637    async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
638        if self.closed.load(Ordering::Relaxed) {
639            return Err(TransportError::Closed);
640        }
641
642        let Some(receiver) = &self.receiver else {
643            return Err(TransportError::Config(
644                "no listen address configured for receiving".into(),
645            ));
646        };
647
648        let mut rx = receiver.lock().await;
649        let mut messages = Vec::with_capacity(max.min(100));
650
651        for _ in 0..max {
652            let result = if self.recv_timeout_ms == 0 {
653                // Non-blocking
654                match rx.try_recv() {
655                    Ok(msg) => Some(msg),
656                    Err(mpsc::error::TryRecvError::Empty) => break,
657                    Err(mpsc::error::TryRecvError::Disconnected) => {
658                        return Err(TransportError::Closed);
659                    }
660                }
661            } else if messages.is_empty() {
662                // First message: wait with timeout
663                match tokio::time::timeout(
664                    std::time::Duration::from_millis(self.recv_timeout_ms),
665                    rx.recv(),
666                )
667                .await
668                {
669                    Ok(Some(msg)) => Some(msg),
670                    Ok(None) => return Err(TransportError::Closed),
671                    Err(_) => break, // Timeout
672                }
673            } else {
674                // Subsequent: non-blocking drain
675                match rx.try_recv() {
676                    Ok(msg) => Some(msg),
677                    Err(_) => break,
678                }
679            };
680
681            if let Some(msg) = result {
682                messages.push(msg);
683            }
684        }
685
686        // Apply inbound filters via the shared partition helper; DLQ entries
687        // are returned in the RecvBatch for the caller to route onward.
688        let batch = self.filter_engine.partition_batch(
689            messages,
690            |m| m.payload.as_ref(),
691            |m| m.key.clone(),
692            |m| m.token.clone(),
693        );
694        let messages = batch.messages;
695        let dlq_entries = batch.dlq_entries;
696        let filtered_tokens = batch.filtered_tokens;
697
698        Ok(RecvBatch {
699            messages,
700            dlq_entries,
701            filtered_tokens,
702        }
703        .into())
704    }
705
706    async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
707        // gRPC has no broker-side persistence -- commit is a no-op.
708        // Acknowledgement is implicit in the Push RPC response.
709        Ok(())
710    }
711}
712
713impl Drop for GrpcTransport {
714    fn drop(&mut self) {
715        // Fire the shutdown signal if close() didn't already (idempotent).
716        if let Some(tx) = self.shutdown_tx.lock().take() {
717            let _ = tx.send(());
718        }
719        // Server handle will be dropped, which aborts the task
720    }
721}
722
723// --- DFE Transport gRPC service implementation ---
724
725/// RAII guard that keeps `dfe_grpc_server_inflight_requests` accurate across
726/// every handler return path (including early `?`/`return` and panics): inc on
727/// construction, dec + re-publish the gauge on drop. No `unsafe`.
728#[cfg(feature = "metrics")]
729struct InflightGuard(Arc<AtomicU64>);
730
731#[cfg(feature = "metrics")]
732impl InflightGuard {
733    fn enter(counter: &Arc<AtomicU64>) -> Self {
734        let n = counter.fetch_add(1, Ordering::Relaxed) + 1;
735        metrics::gauge!("dfe_grpc_server_inflight_requests").set(n as f64);
736        Self(Arc::clone(counter))
737    }
738}
739
740#[cfg(feature = "metrics")]
741impl Drop for InflightGuard {
742    fn drop(&mut self) {
743        let n = self.0.fetch_sub(1, Ordering::Relaxed).saturating_sub(1);
744        metrics::gauge!("dfe_grpc_server_inflight_requests").set(n as f64);
745    }
746}
747
748/// Internal service implementation that receives Push RPCs
749/// and forwards messages into the transport's mpsc channel.
750struct DfeTransportServiceImpl {
751    sender: mpsc::Sender<Message<GrpcToken>>,
752    sequence: Arc<AtomicU64>,
753    /// Server-side in-flight request count, maintained directly (inc on RPC
754    /// arrival, dec on completion). The push-originator scale signal (spec 5b):
755    /// counter-subtraction (`started - handled`) is restart-skewed, so a live
756    /// gauge is the correct shape. Drives `dfe_grpc_server_inflight_requests`.
757    #[cfg(feature = "metrics")]
758    server_inflight: Arc<AtomicU64>,
759    /// Optional pressure governor (G3, `governor` feature). `None` -> handlers
760    /// never consult it. `Some` rejects an inbound Push / batch record with
761    /// `Status::unavailable` while [`UnifiedPressure::should_hold`] holds --
762    /// pressure-driven shedding on top of the channel-full rejection.
763    #[cfg(feature = "governor")]
764    pressure: Option<Arc<crate::governor::UnifiedPressure>>,
765}
766
767#[tonic::async_trait]
768impl proto::dfe_transport_server::DfeTransport for DfeTransportServiceImpl {
769    async fn push(
770        &self,
771        request: Request<proto::PushRequest>,
772    ) -> Result<Response<proto::PushResponse>, Status> {
773        // Server-side in-flight gauge: held for the whole handler, dec on drop.
774        #[cfg(feature = "metrics")]
775        let _inflight = InflightGuard::enter(&self.server_inflight);
776
777        // G3 pressure shedding: reject before doing any work if the governor
778        // says hold. `unavailable` = the gRPC analogue of HTTP 503.
779        #[cfg(feature = "governor")]
780        if let Some(pressure) = &self.pressure
781            && pressure.should_hold()
782        {
783            #[cfg(feature = "metrics")]
784            {
785                metrics::counter!(
786                    "dfe_transport_backpressured_total",
787                    "transport" => "grpc",
788                    "reason" => "pressure"
789                )
790                .increment(1);
791                // New default (metrics audit): shed = UNAVAILABLE/ResourceExhausted.
792                metrics::counter!("dfe_grpc_server_shed_total").increment(1);
793            }
794            return Err(Status::unavailable("under pressure -- inbound held"));
795        }
796
797        let req = request.into_inner();
798        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
799
800        // Extract W3C traceparent from incoming gRPC metadata.
801        #[cfg(feature = "transport-trace")]
802        if let Some(tp) = req.metadata.get(super::propagation::TRACEPARENT_HEADER)
803            && super::propagation::is_valid_traceparent(tp)
804        {
805            tracing::Span::current().record("traceparent", tp.as_str());
806        }
807
808        let format = PayloadFormat::detect(&req.payload);
809        let key = req.metadata.get("topic").map(|s| Arc::from(s.as_str()));
810
811        // `req.payload` is prost `Bytes` (`.bytes(".")`) -- zero-copy decode,
812        // so this is a move not a copy.
813        let msg = Message {
814            key,
815            payload: req.payload,
816            token: GrpcToken::new(seq),
817            timestamp_ms: None,
818            format,
819        };
820
821        match self.sender.try_send(msg) {
822            Ok(()) => {
823                #[cfg(feature = "metrics")]
824                {
825                    metrics::counter!("dfe_transport_sent_total", "transport" => "grpc")
826                        .increment(1);
827                    // New default (metrics audit): inbound throughput on the
828                    // SERVER side (was only emitted by file/pipe/redis).
829                    metrics::counter!("dfe_transport_received_total", "transport" => "grpc")
830                        .increment(1);
831                    metrics::gauge!("dfe_transport_queue_size", "transport" => "grpc").set(
832                        self.sender
833                            .max_capacity()
834                            .saturating_sub(self.sender.capacity()) as f64,
835                    );
836                }
837                Ok(Response::new(proto::PushResponse { accepted: 1 }))
838            }
839            Err(mpsc::error::TrySendError::Full(_)) => {
840                #[cfg(feature = "metrics")]
841                {
842                    metrics::counter!(
843                        "dfe_transport_backpressured_total",
844                        "transport" => "grpc"
845                    )
846                    .increment(1);
847                    metrics::counter!("dfe_grpc_server_shed_total").increment(1);
848                }
849                Err(Status::resource_exhausted("receiver buffer full"))
850            }
851            Err(mpsc::error::TrySendError::Closed(_)) => {
852                #[cfg(feature = "metrics")]
853                {
854                    metrics::counter!(
855                        "dfe_transport_refused_total",
856                        "transport" => "grpc"
857                    )
858                    .increment(1);
859                    metrics::counter!("dfe_grpc_server_shed_total").increment(1);
860                }
861                Err(Status::unavailable("receiver closed"))
862            }
863        }
864    }
865
866    async fn route_batch(
867        &self,
868        request: Request<proto::Batch>,
869    ) -> Result<Response<proto::BatchAck>, Status> {
870        // Server-side in-flight gauge: held for the whole handler, dec on drop.
871        #[cfg(feature = "metrics")]
872        let _inflight = InflightGuard::enter(&self.server_inflight);
873
874        // G3 pressure shedding: reject the whole batch while pressure holds.
875        #[cfg(feature = "governor")]
876        if let Some(pressure) = &self.pressure
877            && pressure.should_hold()
878        {
879            #[cfg(feature = "metrics")]
880            {
881                metrics::counter!(
882                    "dfe_transport_backpressured_total",
883                    "transport" => "grpc",
884                    "reason" => "pressure"
885                )
886                .increment(1);
887                metrics::counter!("dfe_grpc_server_shed_total").increment(1);
888            }
889            return Err(Status::unavailable("under pressure -- inbound held"));
890        }
891
892        // Extract W3C traceparent BEFORE consuming the request body.
893        #[cfg(feature = "transport-trace")]
894        if let Some(tp) = request
895            .metadata()
896            .get(super::propagation::TRACEPARENT_HEADER)
897            .and_then(|v| v.to_str().ok())
898            && super::propagation::is_valid_traceparent(tp)
899        {
900            tracing::Span::current().record("traceparent", tp);
901        }
902
903        let proto_batch = request.into_inner();
904
905        // Decode proto Batch -> rustlib Records (payloads zero-copy `Bytes`,
906        // codec NOT invoked). Records fan into the SAME mpsc channel the
907        // single-message Push path uses, so recv() delivers them unchanged.
908        let records = batch::proto_batch_to_records(proto_batch);
909        let accepted = records.len() as u64;
910
911        // ATOMICITY: reserve channel capacity for the WHOLE batch via
912        // `try_reserve_many` BEFORE enqueuing ANY record. Cannot fit -> reject
913        // all-or-nothing, so a retry re-sends the full block with no
914        // partial-acceptance / duplicate window. A per-record `try_send` loop
915        // could enqueue some then fail mid-batch, stranding a prefix. An empty
916        // batch reserves zero permits (no-op).
917        let permits = match self.sender.try_reserve_many(records.len()) {
918            Ok(permits) => permits,
919            Err(mpsc::error::TrySendError::Full(())) => {
920                #[cfg(feature = "metrics")]
921                {
922                    metrics::counter!(
923                        "dfe_transport_backpressured_total",
924                        "transport" => "grpc"
925                    )
926                    .increment(1);
927                    metrics::counter!("dfe_grpc_server_shed_total").increment(1);
928                }
929                return Err(Status::resource_exhausted("receiver buffer full"));
930            }
931            Err(mpsc::error::TrySendError::Closed(())) => {
932                #[cfg(feature = "metrics")]
933                {
934                    metrics::counter!(
935                        "dfe_transport_refused_total",
936                        "transport" => "grpc"
937                    )
938                    .increment(1);
939                    metrics::counter!("dfe_grpc_server_shed_total").increment(1);
940                }
941                return Err(Status::unavailable("receiver closed"));
942            }
943        };
944
945        // Capacity now held for every record -- enqueuing is infallible.
946        for (permit, record) in permits.zip(records) {
947            let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
948            let format = record.metadata.format;
949            // Auto means the sender did not pin a format. Detect from the lead
950            // byte so the receiver gets a concrete hint; does NOT parse/decode.
951            let format = if format == PayloadFormat::Auto {
952                PayloadFormat::detect(&record.payload)
953            } else {
954                format
955            };
956
957            permit.send(Message {
958                key: record.key,
959                payload: record.payload,
960                token: GrpcToken::new(seq),
961                timestamp_ms: record.metadata.timestamp_ms,
962                format,
963            });
964        }
965
966        #[cfg(feature = "metrics")]
967        {
968            metrics::counter!(
969                "dfe_transport_sent_total",
970                "transport" => "grpc",
971                "path" => "batch"
972            )
973            .increment(accepted);
974            // New default (metrics audit): inbound throughput on the SERVER side.
975            metrics::counter!("dfe_transport_received_total", "transport" => "grpc")
976                .increment(accepted);
977        }
978
979        Ok(Response::new(proto::BatchAck { accepted }))
980    }
981
982    async fn health_check(
983        &self,
984        _request: Request<proto::HealthCheckRequest>,
985    ) -> Result<Response<proto::HealthCheckResponse>, Status> {
986        Ok(Response::new(proto::HealthCheckResponse {
987            status: proto::ServingStatus::Serving.into(),
988        }))
989    }
990}
991
992#[cfg(test)]
993mod tests {
994    use super::*;
995
996    #[test]
997    fn grpc_token_display() {
998        let token = GrpcToken::new(42);
999        assert_eq!(format!("{token}"), "grpc:42");
1000
1001        let token = GrpcToken::with_source(7, Arc::from("peer-1"));
1002        assert_eq!(format!("{token}"), "grpc:peer-1:7");
1003    }
1004
1005    #[test]
1006    fn grpc_config_defaults() {
1007        let config = GrpcConfig::default();
1008        assert!(config.listen.is_none());
1009        assert!(config.endpoint.is_none());
1010        assert_eq!(config.recv_buffer_size, 10_000);
1011        assert_eq!(config.recv_timeout_ms, 100);
1012        assert_eq!(config.send_timeout_ms, 30_000);
1013        assert_eq!(config.max_message_size, 16 * 1024 * 1024);
1014        assert!(!config.compression);
1015        assert!(!config.tls_enabled);
1016        assert!(config.tls_ca_path.is_none());
1017    }
1018
1019    #[test]
1020    fn grpc_client_tls_builds_with_private_ca_and_rejects_half_mtls() {
1021        use std::io::Write;
1022        let cert = rcgen::generate_simple_self_signed(vec!["grpc.test".to_string()]).unwrap();
1023        let mut ca = tempfile::NamedTempFile::new().unwrap();
1024        ca.write_all(cert.cert.pem().as_bytes()).unwrap();
1025        ca.flush().unwrap();
1026
1027        // Private CA + SNI -> builds.
1028        let cfg = GrpcConfig {
1029            endpoint: Some("https://peer:6000".to_string()),
1030            tls_enabled: true,
1031            tls_ca_path: Some(ca.path().to_string_lossy().into_owned()),
1032            tls_domain: Some("grpc.test".to_string()),
1033            ..Default::default()
1034        };
1035        assert!(build_grpc_client_tls(&cfg).is_ok());
1036
1037        // Half-configured mTLS (cert without key) -> error.
1038        let cfg = GrpcConfig {
1039            tls_enabled: true,
1040            tls_client_cert_path: Some(ca.path().to_string_lossy().into_owned()),
1041            tls_client_key_path: None,
1042            ..Default::default()
1043        };
1044        assert!(build_grpc_client_tls(&cfg).is_err());
1045    }
1046
1047    #[test]
1048    fn grpc_config_server() {
1049        let config = GrpcConfig::server("0.0.0.0:6000");
1050        assert_eq!(config.listen.as_deref(), Some("0.0.0.0:6000"));
1051        assert!(config.endpoint.is_none());
1052    }
1053
1054    #[test]
1055    fn grpc_config_client() {
1056        let config = GrpcConfig::client("http://loader:6000");
1057        assert!(config.listen.is_none());
1058        assert_eq!(config.endpoint.as_deref(), Some("http://loader:6000"));
1059    }
1060
1061    #[tokio::test]
1062    async fn send_batch_rejects_oversize_block() {
1063        // Over max_message_size, reject with a clear Fatal naming the limit
1064        // BEFORE the RPC -- tonic would otherwise return an opaque OutOfRange
1065        // (also Fatal) that can never succeed on retry. The size check fires
1066        // before any connection, so no server is needed.
1067        let config = GrpcConfig::client("http://127.0.0.1:1").with_max_message_size(64);
1068        let transport = GrpcTransport::new(&config).await.unwrap();
1069        let rec = Record {
1070            payload: bytes::Bytes::from(vec![b'x'; 256]),
1071            key: None,
1072            headers: Vec::new(),
1073            metadata: crate::transport::work_batch::RecordMeta {
1074                timestamp_ms: None,
1075                format: PayloadFormat::Json,
1076            },
1077        };
1078        match transport.send_batch(&[rec]).await {
1079            SendResult::Fatal(e) => assert!(
1080                e.to_string().contains("max_message_size"),
1081                "error should name the limit, got: {e}"
1082            ),
1083            other => panic!("expected Fatal for oversize block, got {other:?}"),
1084        }
1085    }
1086
1087    #[test]
1088    fn grpc_config_with_compression() {
1089        let config = GrpcConfig::server("0.0.0.0:6000").with_compression();
1090        assert!(config.compression);
1091    }
1092
1093    #[tokio::test]
1094    async fn grpc_transport_client_only() {
1095        // Client-only transport (lazy connection, no server)
1096        let config = GrpcConfig::client("http://localhost:16000");
1097        let transport = GrpcTransport::new(&config).await.unwrap();
1098
1099        assert!(transport.client.is_some());
1100        assert!(transport.receiver.is_none());
1101        assert!(transport.is_healthy());
1102        assert_eq!(transport.name(), "grpc");
1103
1104        // recv should error (no server)
1105        let result = transport.recv(10).await;
1106        assert!(result.is_err());
1107
1108        // commit is always ok
1109        transport.commit(&[]).await.unwrap();
1110    }
1111
1112    /// G3: with a pressure governor pinned HIGH, the gRPC Push handler rejects
1113    /// with `Status::unavailable` (the gRPC analogue of 503). The default `new`
1114    /// (no governor) accepts as before.
1115    #[cfg(feature = "governor")]
1116    #[tokio::test]
1117    async fn grpc_pressure_high_rejects_unavailable() {
1118        use crate::governor::{Hysteresis, MemoryPressureSource, PressureSource, UnifiedPressure};
1119        use crate::memory::{MemoryGuard, MemoryGuardConfig};
1120
1121        let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1122            limit_bytes: 1000,
1123            pressure_threshold: 0.80,
1124            ..Default::default()
1125        }));
1126        guard.add_bytes(950); // 95%
1127        let pressure = Arc::new(UnifiedPressure::new(
1128            vec![Arc::new(MemoryPressureSource::new(Arc::clone(&guard))) as Arc<dyn PressureSource>],
1129            Hysteresis::new(0.80, 0.65).expect("valid band"),
1130        ));
1131        assert!(pressure.should_hold(), "pinned-high governor must hold");
1132
1133        // Server bound to the governor.
1134        let server_cfg = GrpcConfig::server("127.0.0.1:16077");
1135        let server = GrpcTransport::with_pressure(&server_cfg, Some(Arc::clone(&pressure)))
1136            .await
1137            .unwrap();
1138        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1139
1140        // Client pushes -> rejected as backpressure (maps to Backpressured).
1141        let client_cfg = GrpcConfig::client("http://127.0.0.1:16077");
1142        let client = GrpcTransport::new(&client_cfg).await.unwrap();
1143        let result = client
1144            .send("events", bytes::Bytes::from_static(b"{\"x\":1}"))
1145            .await;
1146        assert!(
1147            matches!(result, SendResult::Backpressured),
1148            "push under pressure must surface as backpressure, got {result:?}"
1149        );
1150
1151        client.close().await.unwrap();
1152        server.close().await.unwrap();
1153    }
1154
1155    #[tokio::test]
1156    async fn grpc_transport_server_only() {
1157        // Server-only transport (no client for sending)
1158        // Note: port 0 may not work with tonic parse, use a specific port
1159        let config = GrpcConfig::server("127.0.0.1:16001");
1160        let transport = GrpcTransport::new(&config).await.unwrap();
1161
1162        assert!(transport.client.is_none());
1163        assert!(transport.receiver.is_some());
1164        assert!(transport.is_healthy());
1165
1166        // send should error (no client)
1167        let result = transport
1168            .send("test", bytes::Bytes::from_static(b"payload"))
1169            .await;
1170        assert!(result.is_fatal());
1171
1172        // Close
1173        transport.close().await.unwrap();
1174        assert!(!transport.is_healthy());
1175    }
1176}