Skip to main content

hyperi_rustlib/transport/grpc/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/grpc/mod.rs
3// Purpose:   gRPC transport backend
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # gRPC Transport
10//!
11//! DFE native gRPC transport using tonic. Supports client mode (sending),
12//! server mode (receiving), or both.
13//!
14//! ## DFE Native Protocol
15//!
16//! Lightweight bulk bytes transfer via `dfe.transport.v1.DfeTransport/Push`.
17//! Payload is opaque bytes (JSON, MsgPack, or Arrow IPC) with a format hint.
18//!
19//! ## Vector Wire Protocol Compatibility (optional)
20//!
21//! When the `transport-grpc-vector-compat` feature is enabled and
22//! `GrpcConfig::vector_compat` is true, the server also accepts
23//! `vector.Vector/PushEvents` RPCs from legacy Vector sinks.
24//!
25//! ## Example
26//!
27//! ```rust,ignore
28//! use hyperi_rustlib::transport::{GrpcTransport, GrpcConfig, TransportReceiver};
29//!
30//! // Server mode (receive from remote senders)
31//! let config = GrpcConfig::server("0.0.0.0:6000");
32//! let transport = GrpcTransport::new(&config).await?;
33//!
34//! let records = transport.recv(100).await?.records;
35//! // commit is a no-op for gRPC (no persistence)
36//! transport.commit(&[]).await?;
37//! ```
38
39pub mod batch;
40pub mod config;
41pub mod proto;
42pub mod token;
43
44pub use config::GrpcConfig;
45pub use token::GrpcToken;
46
47use super::error::{TransportError, TransportResult};
48use super::traits::{RecvBatch, TransportBase, TransportReceiver, TransportSender};
49use super::types::{Message, PayloadFormat, SendResult};
50use super::work_batch::{Record, WorkBatch};
51use std::collections::HashMap;
52use std::sync::Arc;
53use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
54use tokio::sync::{mpsc, oneshot};
55use tonic::{Request, Response, Status};
56
57/// gRPC transport for DFE inter-service communication.
58///
59/// Implements both `TransportSender` and `TransportReceiver`, so it also
60/// satisfies the unified `Transport` trait via blanket impl.
61pub struct GrpcTransport {
62    /// Client for sending (None if server-only mode).
63    client: Option<proto::dfe_transport_client::DfeTransportClient<tonic::transport::Channel>>,
64
65    /// Receiver channel (None if client-only mode).
66    receiver: Option<tokio::sync::Mutex<mpsc::Receiver<Message<GrpcToken>>>>,
67
68    /// Shutdown signal for the server task. Behind a `Mutex<Option<..>>` so
69    /// `close(&self)` (not just `Drop`) can take and fire it.
70    shutdown_tx: parking_lot::Mutex<Option<oneshot::Sender<()>>>,
71
72    /// Server background task handle (kept alive, aborted on drop).
73    _server_handle: Option<tokio::task::JoinHandle<Result<(), tonic::transport::Error>>>,
74
75    /// Whether the transport is closed.
76    closed: AtomicBool,
77
78    /// Shared healthy flag -- read by health registry closure, written by close().
79    healthy: Arc<AtomicBool>,
80
81    /// Receive timeout (milliseconds).
82    recv_timeout_ms: u64,
83
84    /// Per-RPC send deadline (milliseconds, 0 = none).
85    send_timeout_ms: u64,
86
87    /// tonic's max encoded message size. A batch over this fails the RPC with
88    /// an unretryable `OutOfRange`; we pre-check against it to reject early with
89    /// an actionable error instead.
90    max_message_size: usize,
91
92    /// In-flight send count (for metrics).
93    #[cfg(feature = "metrics")]
94    inflight: AtomicU64,
95
96    /// Transport-level message filter engine.
97    filter_engine: super::filter::TransportFilterEngine,
98}
99
100/// Build a tonic `ClientTlsConfig` from the unified TLS fields on `GrpcConfig`.
101///
102/// tonic owns its TLS stack (like librdkafka), so map `TlsTrust` onto
103/// `ClientTlsConfig`: private-CA PEM (else OS native roots), optional SNI
104/// override, optional mTLS client identity.
105fn build_grpc_client_tls(
106    config: &GrpcConfig,
107) -> TransportResult<tonic::transport::ClientTlsConfig> {
108    use tonic::transport::{Certificate, ClientTlsConfig, Identity};
109
110    let mut tls = ClientTlsConfig::new();
111
112    if let Some(ref ca) = config.tls_ca_path {
113        let pem = std::fs::read(ca)
114            .map_err(|e| TransportError::Config(format!("gRPC TLS: cannot read ca {ca}: {e}")))?;
115        tls = tls.ca_certificate(Certificate::from_pem(pem));
116    } else {
117        // No private CA -> trust the OS native roots.
118        tls = tls.with_native_roots();
119    }
120
121    if let Some(ref domain) = config.tls_domain {
122        tls = tls.domain_name(domain.clone());
123    }
124
125    // mTLS identity -- both cert and key, or neither.
126    match (&config.tls_client_cert_path, &config.tls_client_key_path) {
127        (Some(cert), Some(key)) => {
128            let cert_pem = std::fs::read(cert).map_err(|e| {
129                TransportError::Config(format!("gRPC TLS: cannot read client cert {cert}: {e}"))
130            })?;
131            let key_pem = std::fs::read(key).map_err(|e| {
132                TransportError::Config(format!("gRPC TLS: cannot read client key {key}: {e}"))
133            })?;
134            tls = tls.identity(Identity::from_pem(cert_pem, key_pem));
135        }
136        (None, None) => {}
137        _ => {
138            return Err(TransportError::Config(
139                "gRPC TLS: mTLS requires BOTH tls_client_cert_path and tls_client_key_path"
140                    .to_string(),
141            ));
142        }
143    }
144
145    Ok(tls)
146}
147
148impl GrpcTransport {
149    /// Create a new gRPC transport.
150    ///
151    /// # Configuration
152    ///
153    /// - Set `config.listen` to start a gRPC server (receive mode).
154    /// - Set `config.endpoint` to connect to a remote server (send mode).
155    /// - Set both for bidirectional communication.
156    ///
157    /// # Errors
158    ///
159    /// Returns error if the listen address is invalid or the server fails to start.
160    pub async fn new(config: &GrpcConfig) -> TransportResult<Self> {
161        Self::new_inner(
162            config,
163            #[cfg(feature = "governor")]
164            None,
165        )
166        .await
167    }
168
169    /// Create a gRPC transport bound to a pressure governor (G3, `governor`
170    /// feature).
171    ///
172    /// Like [`new`](Self::new), but the receive server consults `pressure`
173    /// before enqueuing each inbound Push / batch record: while
174    /// [`UnifiedPressure::should_hold`](crate::governor::UnifiedPressure::should_hold)
175    /// holds, the RPC is rejected with `Status::unavailable` (the gRPC analogue
176    /// of HTTP 503). `None` is equivalent to [`new`](Self::new).
177    ///
178    /// # Errors
179    ///
180    /// Same as [`new`](Self::new).
181    #[cfg(feature = "governor")]
182    pub async fn with_pressure(
183        config: &GrpcConfig,
184        pressure: Option<Arc<crate::governor::UnifiedPressure>>,
185    ) -> TransportResult<Self> {
186        Self::new_inner(config, pressure).await
187    }
188
189    async fn new_inner(
190        config: &GrpcConfig,
191        #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
192    ) -> TransportResult<Self> {
193        let mut client = None;
194        let mut receiver = None;
195        let mut shutdown_tx = None;
196        let mut server_handle = None;
197        let sequence = Arc::new(AtomicU64::new(0));
198
199        // Set up client (lazy connection -- doesn't fail until first RPC)
200        if let Some(endpoint) = &config.endpoint {
201            let mut ep = tonic::transport::Channel::from_shared(endpoint.clone())
202                .map_err(|e| TransportError::Config(format!("invalid endpoint: {e}")))?;
203
204            // Client TLS. tonic owns its TLS stack, so we map the unified
205            // vocabulary onto ClientTlsConfig (private CA, mTLS identity, SNI).
206            if config.tls_enabled {
207                ep = ep
208                    .tls_config(build_grpc_client_tls(config)?)
209                    .map_err(|e| TransportError::Config(format!("gRPC TLS config: {e}")))?;
210            }
211
212            let channel = ep.connect_lazy();
213
214            let mut c = proto::dfe_transport_client::DfeTransportClient::new(channel)
215                .max_decoding_message_size(config.max_message_size)
216                .max_encoding_message_size(config.max_message_size);
217
218            if config.compression {
219                c = c
220                    .send_compressed(tonic::codec::CompressionEncoding::Gzip)
221                    .accept_compressed(tonic::codec::CompressionEncoding::Gzip);
222            }
223
224            client = Some(c);
225        }
226
227        // Set up server
228        if let Some(listen) = &config.listen {
229            let addr: std::net::SocketAddr = listen
230                .parse()
231                .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
232
233            let (tx, rx) = mpsc::channel(config.recv_buffer_size);
234            let (sd_tx, sd_rx) = oneshot::channel();
235
236            // DFE native service
237            let dfe_svc = DfeTransportServiceImpl {
238                sender: tx.clone(),
239                sequence: sequence.clone(),
240                #[cfg(feature = "governor")]
241                pressure: pressure.clone(),
242            };
243
244            let dfe_server = proto::dfe_transport_server::DfeTransportServer::new(dfe_svc)
245                .max_decoding_message_size(config.max_message_size)
246                .max_encoding_message_size(config.max_message_size)
247                .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
248                .send_compressed(tonic::codec::CompressionEncoding::Gzip);
249
250            // Build server with optional Vector compat
251            let mut builder = tonic::transport::Server::builder();
252
253            #[cfg(feature = "transport-grpc-vector-compat")]
254            let router = if config.vector_compat {
255                let vector_svc =
256                    super::vector_compat::source::VectorCompatService::new(tx, sequence.clone());
257                let vector_server =
258                    super::vector_compat::proto::vector::vector_server::VectorServer::new(
259                        vector_svc,
260                    )
261                    .max_decoding_message_size(config.max_message_size)
262                    .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
263                    .send_compressed(tonic::codec::CompressionEncoding::Gzip);
264
265                builder.add_service(dfe_server).add_service(vector_server)
266            } else {
267                builder.add_service(dfe_server)
268            };
269
270            #[cfg(not(feature = "transport-grpc-vector-compat"))]
271            let router = builder.add_service(dfe_server);
272
273            // Bind the listener synchronously BEFORE spawning the serve task,
274            // so `new()` returning is a true readiness signal -- callers connect
275            // immediately, no polling. Binding inside the spawned task let
276            // `new()` return before the socket existed.
277            let listener = tokio::net::TcpListener::bind(addr)
278                .await
279                .map_err(|e| TransportError::Config(format!("failed to bind {addr}: {e}")))?;
280            let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
281
282            let handle = tokio::spawn(async move {
283                router
284                    .serve_with_incoming_shutdown(incoming, async {
285                        sd_rx.await.ok();
286                    })
287                    .await
288            });
289
290            receiver = Some(tokio::sync::Mutex::new(rx));
291            shutdown_tx = Some(sd_tx);
292            server_handle = Some(handle);
293        } else {
294            // No receive server -> nothing to attach the governor to. Consume
295            // it to silence the unused-variable warning.
296            #[cfg(feature = "governor")]
297            let _ = pressure;
298        }
299
300        let healthy = Arc::new(AtomicBool::new(true));
301
302        let filter_engine = super::filter::TransportFilterEngine::new(
303            &config.filters_in,
304            &config.filters_out,
305            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
306        )?;
307
308        #[cfg(feature = "health")]
309        {
310            let h = Arc::clone(&healthy);
311            crate::health::HealthRegistry::register("transport:grpc", move || {
312                if h.load(Ordering::Relaxed) {
313                    crate::health::HealthStatus::Healthy
314                } else {
315                    crate::health::HealthStatus::Unhealthy
316                }
317            });
318        }
319
320        Ok(Self {
321            client,
322            receiver,
323            shutdown_tx: parking_lot::Mutex::new(shutdown_tx),
324            _server_handle: server_handle,
325            closed: AtomicBool::new(false),
326            healthy,
327            recv_timeout_ms: config.recv_timeout_ms,
328            send_timeout_ms: config.send_timeout_ms,
329            max_message_size: config.max_message_size,
330            #[cfg(feature = "metrics")]
331            inflight: AtomicU64::new(0),
332            filter_engine,
333        })
334    }
335}
336
337impl TransportSender for GrpcTransport {
338    async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
339        if self.closed.load(Ordering::Relaxed) {
340            return SendResult::Fatal(TransportError::Closed);
341        }
342
343        // Outbound filter check
344        if self.filter_engine.has_outbound_filters() {
345            match self.filter_engine.apply_outbound(&payload) {
346                super::filter::FilterDisposition::Pass => {}
347                super::filter::FilterDisposition::Drop => return SendResult::Ok,
348                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
349            }
350        }
351
352        let Some(client) = &self.client else {
353            return SendResult::Fatal(TransportError::Config(
354                "no endpoint configured for sending".into(),
355            ));
356        };
357
358        let mut metadata = HashMap::new();
359        if !key.is_empty() {
360            metadata.insert("topic".to_string(), key.to_string());
361        }
362
363        // Inject W3C traceparent into gRPC metadata.
364        #[cfg(feature = "transport-trace")]
365        if let Some(tp) = super::propagation::current_traceparent() {
366            metadata.insert(super::propagation::TRACEPARENT_HEADER.to_string(), tp);
367        }
368
369        let mut request = tonic::Request::new(proto::PushRequest {
370            // proto field is `Bytes` (`.bytes(".")` in build.rs) -- move, no copy.
371            payload,
372            format: proto::Format::Auto.into(),
373            metadata,
374        });
375
376        // Bound the RPC so a hung/black-holing server cannot wedge the sender
377        // task forever. Sent as the grpc-timeout header.
378        if self.send_timeout_ms > 0 {
379            request.set_timeout(std::time::Duration::from_millis(self.send_timeout_ms));
380        }
381
382        #[cfg(feature = "metrics")]
383        let start = std::time::Instant::now();
384
385        #[cfg(feature = "metrics")]
386        self.inflight.fetch_add(1, Ordering::Relaxed);
387
388        // tonic clients are cheaply cloneable (shared channel)
389        let result = match client.clone().push(request).await {
390            Ok(_) => {
391                #[cfg(feature = "metrics")]
392                metrics::counter!("dfe_transport_sent_total", "transport" => "grpc").increment(1);
393                SendResult::Ok
394            }
395            Err(status) => match status.code() {
396                // Transient -- backpressure so the caller retries, not drop.
397                // DeadlineExceeded = send_timeout_ms fired (slow/hung server).
398                tonic::Code::Unavailable
399                | tonic::Code::ResourceExhausted
400                | tonic::Code::DeadlineExceeded => {
401                    #[cfg(feature = "metrics")]
402                    metrics::counter!(
403                        "dfe_transport_backpressured_total",
404                        "transport" => "grpc"
405                    )
406                    .increment(1);
407                    SendResult::Backpressured
408                }
409                _ => {
410                    #[cfg(feature = "metrics")]
411                    metrics::counter!(
412                        "dfe_transport_send_errors_total",
413                        "transport" => "grpc"
414                    )
415                    .increment(1);
416                    SendResult::Fatal(TransportError::Send(status.message().to_string()))
417                }
418            },
419        };
420
421        #[cfg(feature = "metrics")]
422        {
423            self.inflight.fetch_sub(1, Ordering::Relaxed);
424            metrics::gauge!("dfe_transport_inflight", "transport" => "grpc")
425                .set(self.inflight.load(Ordering::Relaxed) as f64);
426            metrics::histogram!(
427                "dfe_transport_send_duration_seconds",
428                "transport" => "grpc"
429            )
430            .record(start.elapsed().as_secs_f64());
431        }
432
433        result
434    }
435
436    /// Send a whole batch of records in ONE `RouteBatch` RPC (Task 0.6).
437    ///
438    /// Native batch override of [`TransportSender::send_batch`]: serde-less
439    /// rustlib<->rustlib transfer. Records map to a proto
440    /// [`Batch`](proto::Batch) via [`batch::records_to_proto`]; payloads travel
441    /// as OPAQUE `bytes`, the JSON / MsgPack codec is NEVER invoked in transit.
442    ///
443    /// Commit tokens and inline-DLQ entries are NOT sent -- the SENDER's local
444    /// concern. Pass the records (e.g. `&workbatch.records`); the caller fires
445    /// its commit tokens locally after this returns `Ok`.
446    ///
447    /// ## Atomic (all-or-nothing) acceptance
448    ///
449    /// The server reserves channel capacity for the WHOLE batch (one
450    /// `try_reserve_many`) before enqueuing any record -- no partial-send
451    /// window. `Backpressured` means ZERO records were admitted, so the caller
452    /// retries the whole block (at-least-once) with no duplicate prefix.
453    ///
454    /// # Errors / result
455    ///
456    /// Returns a [`SendResult`]. `Backpressured` maps the same transient gRPC
457    /// codes as [`send`](TransportSender::send).
458    async fn send_batch(&self, records: &[Record]) -> SendResult {
459        if self.closed.load(Ordering::Relaxed) {
460            return SendResult::Fatal(TransportError::Closed);
461        }
462
463        let Some(client) = &self.client else {
464            return SendResult::Fatal(TransportError::Config(
465                "no endpoint configured for sending".into(),
466            ));
467        };
468
469        // Apply outbound filters BEFORE the wire: a record matched by a `drop`
470        // or `dlq` filter must NOT be transmitted. This path once bypassed the
471        // filter entirely -- a record told to drop sailed through.
472        let to_send: Vec<Record> = if self.filter_engine.has_outbound_filters() {
473            let mut keep = Vec::with_capacity(records.len());
474            for r in records {
475                match self.filter_engine.apply_outbound(&r.payload) {
476                    super::filter::FilterDisposition::Pass => keep.push(r.clone()),
477                    super::filter::FilterDisposition::Drop
478                    | super::filter::FilterDisposition::Dlq => {}
479                }
480            }
481            keep
482        } else {
483            records.to_vec()
484        };
485
486        // Everything was filtered out -- nothing to send, batch is "done".
487        if to_send.is_empty() {
488            return SendResult::Ok;
489        }
490        let sent_count = to_send.len();
491
492        // Reject an oversized block early with an actionable error. Over
493        // max_message_size, tonic fails with an opaque OutOfRange that maps to
494        // Fatal and can never succeed on retry; the fix is a smaller block, so
495        // name the limit and point at the byte-budget lever. Payload-only bound
496        // (proto framing adds a little); tonic still backstops the margin.
497        let payload_bytes: usize = to_send.iter().map(|r| r.payload.len()).sum();
498        if payload_bytes > self.max_message_size {
499            #[cfg(feature = "metrics")]
500            metrics::counter!("dfe_transport_oversize_total", "transport" => "grpc").increment(1);
501            return SendResult::Fatal(TransportError::Config(format!(
502                "gRPC batch payload {payload_bytes} bytes exceeds max_message_size \
503                 {} -- lower the self-regulation byte budget below the gRPC limit",
504                self.max_message_size
505            )));
506        }
507
508        // Map records -> proto Batch. Payloads are MOVED (Bytes handle), opaque.
509        let proto_batch = batch::records_to_proto(to_send);
510
511        let mut request = tonic::Request::new(proto_batch);
512
513        // Inject W3C traceparent into gRPC metadata.
514        #[cfg(feature = "transport-trace")]
515        if let Some(tp) = super::propagation::current_traceparent()
516            && let Ok(val) = tp.parse()
517        {
518            request
519                .metadata_mut()
520                .insert(super::propagation::TRACEPARENT_HEADER, val);
521        }
522
523        if self.send_timeout_ms > 0 {
524            request.set_timeout(std::time::Duration::from_millis(self.send_timeout_ms));
525        }
526
527        #[cfg(feature = "metrics")]
528        let start = std::time::Instant::now();
529        #[cfg(feature = "metrics")]
530        self.inflight.fetch_add(1, Ordering::Relaxed);
531
532        let result = match client.clone().route_batch(request).await {
533            Ok(response) => {
534                // Server is all-or-nothing today, but the proto permits partial
535                // acceptance. Treating ANY Ok as full success would fire every
536                // commit token while the receiver kept only a prefix -- silent
537                // loss. Require accepted == sent; a shortfall is retryable
538                // (at-least-once).
539                let accepted = response.into_inner().accepted;
540                if accepted < sent_count as u64 {
541                    #[cfg(feature = "metrics")]
542                    metrics::counter!(
543                        "dfe_transport_backpressured_total",
544                        "transport" => "grpc"
545                    )
546                    .increment(1);
547                    tracing::warn!(
548                        accepted,
549                        sent = sent_count,
550                        "gRPC RouteBatch partially accepted -- retrying whole block"
551                    );
552                    SendResult::Backpressured
553                } else {
554                    #[cfg(feature = "metrics")]
555                    metrics::counter!(
556                        "dfe_transport_sent_total",
557                        "transport" => "grpc",
558                        "path" => "batch"
559                    )
560                    .increment(sent_count as u64);
561                    SendResult::Ok
562                }
563            }
564            Err(status) => match status.code() {
565                tonic::Code::Unavailable
566                | tonic::Code::ResourceExhausted
567                | tonic::Code::DeadlineExceeded => {
568                    #[cfg(feature = "metrics")]
569                    metrics::counter!(
570                        "dfe_transport_backpressured_total",
571                        "transport" => "grpc"
572                    )
573                    .increment(1);
574                    SendResult::Backpressured
575                }
576                _ => {
577                    #[cfg(feature = "metrics")]
578                    metrics::counter!(
579                        "dfe_transport_send_errors_total",
580                        "transport" => "grpc"
581                    )
582                    .increment(1);
583                    SendResult::Fatal(TransportError::Send(status.message().to_string()))
584                }
585            },
586        };
587
588        #[cfg(feature = "metrics")]
589        {
590            self.inflight.fetch_sub(1, Ordering::Relaxed);
591            metrics::histogram!(
592                "dfe_transport_send_duration_seconds",
593                "transport" => "grpc"
594            )
595            .record(start.elapsed().as_secs_f64());
596        }
597
598        result
599    }
600}
601
602impl TransportBase for GrpcTransport {
603    async fn close(&self) -> TransportResult<()> {
604        self.closed.store(true, Ordering::Relaxed);
605        self.healthy.store(false, Ordering::Relaxed);
606
607        // Actually stop the server: fire the shutdown oneshot so
608        // serve_with_incoming_shutdown completes and the listener is freed.
609        // Idempotent -- a second close() (or Drop) finds None.
610        if let Some(tx) = self.shutdown_tx.lock().take() {
611            let _ = tx.send(());
612        }
613        Ok(())
614    }
615
616    fn is_healthy(&self) -> bool {
617        let healthy = self.healthy.load(Ordering::Relaxed);
618        #[cfg(feature = "metrics")]
619        metrics::gauge!("dfe_transport_healthy", "transport" => "grpc").set(if healthy {
620            1.0
621        } else {
622            0.0
623        });
624        healthy
625    }
626
627    fn name(&self) -> &'static str {
628        "grpc"
629    }
630}
631
632impl TransportReceiver for GrpcTransport {
633    type Token = GrpcToken;
634
635    async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
636        if self.closed.load(Ordering::Relaxed) {
637            return Err(TransportError::Closed);
638        }
639
640        let Some(receiver) = &self.receiver else {
641            return Err(TransportError::Config(
642                "no listen address configured for receiving".into(),
643            ));
644        };
645
646        let mut rx = receiver.lock().await;
647        let mut messages = Vec::with_capacity(max.min(100));
648
649        for _ in 0..max {
650            let result = if self.recv_timeout_ms == 0 {
651                // Non-blocking
652                match rx.try_recv() {
653                    Ok(msg) => Some(msg),
654                    Err(mpsc::error::TryRecvError::Empty) => break,
655                    Err(mpsc::error::TryRecvError::Disconnected) => {
656                        return Err(TransportError::Closed);
657                    }
658                }
659            } else if messages.is_empty() {
660                // First message: wait with timeout
661                match tokio::time::timeout(
662                    std::time::Duration::from_millis(self.recv_timeout_ms),
663                    rx.recv(),
664                )
665                .await
666                {
667                    Ok(Some(msg)) => Some(msg),
668                    Ok(None) => return Err(TransportError::Closed),
669                    Err(_) => break, // Timeout
670                }
671            } else {
672                // Subsequent: non-blocking drain
673                match rx.try_recv() {
674                    Ok(msg) => Some(msg),
675                    Err(_) => break,
676                }
677            };
678
679            if let Some(msg) = result {
680                messages.push(msg);
681            }
682        }
683
684        // Apply inbound filters via the shared partition helper; DLQ entries
685        // are returned in the RecvBatch for the caller to route onward.
686        let batch = self.filter_engine.partition_batch(
687            messages,
688            |m| m.payload.as_ref(),
689            |m| m.key.clone(),
690            |m| m.token.clone(),
691        );
692        let messages = batch.messages;
693        let dlq_entries = batch.dlq_entries;
694        let filtered_tokens = batch.filtered_tokens;
695
696        Ok(RecvBatch {
697            messages,
698            dlq_entries,
699            filtered_tokens,
700        }
701        .into())
702    }
703
704    async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
705        // gRPC has no broker-side persistence -- commit is a no-op.
706        // Acknowledgement is implicit in the Push RPC response.
707        Ok(())
708    }
709}
710
711impl Drop for GrpcTransport {
712    fn drop(&mut self) {
713        // Fire the shutdown signal if close() didn't already (idempotent).
714        if let Some(tx) = self.shutdown_tx.lock().take() {
715            let _ = tx.send(());
716        }
717        // Server handle will be dropped, which aborts the task
718    }
719}
720
721// --- DFE Transport gRPC service implementation ---
722
723/// Internal service implementation that receives Push RPCs
724/// and forwards messages into the transport's mpsc channel.
725struct DfeTransportServiceImpl {
726    sender: mpsc::Sender<Message<GrpcToken>>,
727    sequence: Arc<AtomicU64>,
728    /// Optional pressure governor (G3, `governor` feature). `None` -> handlers
729    /// never consult it. `Some` rejects an inbound Push / batch record with
730    /// `Status::unavailable` while [`UnifiedPressure::should_hold`] holds --
731    /// pressure-driven shedding on top of the channel-full rejection.
732    #[cfg(feature = "governor")]
733    pressure: Option<Arc<crate::governor::UnifiedPressure>>,
734}
735
736#[tonic::async_trait]
737impl proto::dfe_transport_server::DfeTransport for DfeTransportServiceImpl {
738    async fn push(
739        &self,
740        request: Request<proto::PushRequest>,
741    ) -> Result<Response<proto::PushResponse>, Status> {
742        // G3 pressure shedding: reject before doing any work if the governor
743        // says hold. `unavailable` = the gRPC analogue of HTTP 503.
744        #[cfg(feature = "governor")]
745        if let Some(pressure) = &self.pressure
746            && pressure.should_hold()
747        {
748            #[cfg(feature = "metrics")]
749            metrics::counter!(
750                "dfe_transport_backpressured_total",
751                "transport" => "grpc",
752                "reason" => "pressure"
753            )
754            .increment(1);
755            return Err(Status::unavailable("under pressure -- inbound held"));
756        }
757
758        let req = request.into_inner();
759        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
760
761        // Extract W3C traceparent from incoming gRPC metadata.
762        #[cfg(feature = "transport-trace")]
763        if let Some(tp) = req.metadata.get(super::propagation::TRACEPARENT_HEADER)
764            && super::propagation::is_valid_traceparent(tp)
765        {
766            tracing::Span::current().record("traceparent", tp.as_str());
767        }
768
769        let format = PayloadFormat::detect(&req.payload);
770        let key = req.metadata.get("topic").map(|s| Arc::from(s.as_str()));
771
772        // `req.payload` is prost `Bytes` (`.bytes(".")`) -- zero-copy decode,
773        // so this is a move not a copy.
774        let msg = Message {
775            key,
776            payload: req.payload,
777            token: GrpcToken::new(seq),
778            timestamp_ms: None,
779            format,
780        };
781
782        match self.sender.try_send(msg) {
783            Ok(()) => {
784                #[cfg(feature = "metrics")]
785                {
786                    metrics::counter!("dfe_transport_sent_total", "transport" => "grpc")
787                        .increment(1);
788                    metrics::gauge!("dfe_transport_queue_size", "transport" => "grpc").set(
789                        self.sender
790                            .max_capacity()
791                            .saturating_sub(self.sender.capacity()) as f64,
792                    );
793                }
794                Ok(Response::new(proto::PushResponse { accepted: 1 }))
795            }
796            Err(mpsc::error::TrySendError::Full(_)) => {
797                #[cfg(feature = "metrics")]
798                metrics::counter!(
799                    "dfe_transport_backpressured_total",
800                    "transport" => "grpc"
801                )
802                .increment(1);
803                Err(Status::resource_exhausted("receiver buffer full"))
804            }
805            Err(mpsc::error::TrySendError::Closed(_)) => {
806                #[cfg(feature = "metrics")]
807                metrics::counter!(
808                    "dfe_transport_refused_total",
809                    "transport" => "grpc"
810                )
811                .increment(1);
812                Err(Status::unavailable("receiver closed"))
813            }
814        }
815    }
816
817    async fn route_batch(
818        &self,
819        request: Request<proto::Batch>,
820    ) -> Result<Response<proto::BatchAck>, Status> {
821        // G3 pressure shedding: reject the whole batch while pressure holds.
822        #[cfg(feature = "governor")]
823        if let Some(pressure) = &self.pressure
824            && pressure.should_hold()
825        {
826            #[cfg(feature = "metrics")]
827            metrics::counter!(
828                "dfe_transport_backpressured_total",
829                "transport" => "grpc",
830                "reason" => "pressure"
831            )
832            .increment(1);
833            return Err(Status::unavailable("under pressure -- inbound held"));
834        }
835
836        // Extract W3C traceparent BEFORE consuming the request body.
837        #[cfg(feature = "transport-trace")]
838        if let Some(tp) = request
839            .metadata()
840            .get(super::propagation::TRACEPARENT_HEADER)
841            .and_then(|v| v.to_str().ok())
842            && super::propagation::is_valid_traceparent(tp)
843        {
844            tracing::Span::current().record("traceparent", tp);
845        }
846
847        let proto_batch = request.into_inner();
848
849        // Decode proto Batch -> rustlib Records (payloads zero-copy `Bytes`,
850        // codec NOT invoked). Records fan into the SAME mpsc channel the
851        // single-message Push path uses, so recv() delivers them unchanged.
852        let records = batch::proto_batch_to_records(proto_batch);
853        let accepted = records.len() as u64;
854
855        // ATOMICITY: reserve channel capacity for the WHOLE batch via
856        // `try_reserve_many` BEFORE enqueuing ANY record. Cannot fit -> reject
857        // all-or-nothing, so a retry re-sends the full block with no
858        // partial-acceptance / duplicate window. A per-record `try_send` loop
859        // could enqueue some then fail mid-batch, stranding a prefix. An empty
860        // batch reserves zero permits (no-op).
861        let permits = match self.sender.try_reserve_many(records.len()) {
862            Ok(permits) => permits,
863            Err(mpsc::error::TrySendError::Full(())) => {
864                #[cfg(feature = "metrics")]
865                metrics::counter!(
866                    "dfe_transport_backpressured_total",
867                    "transport" => "grpc"
868                )
869                .increment(1);
870                return Err(Status::resource_exhausted("receiver buffer full"));
871            }
872            Err(mpsc::error::TrySendError::Closed(())) => {
873                #[cfg(feature = "metrics")]
874                metrics::counter!(
875                    "dfe_transport_refused_total",
876                    "transport" => "grpc"
877                )
878                .increment(1);
879                return Err(Status::unavailable("receiver closed"));
880            }
881        };
882
883        // Capacity now held for every record -- enqueuing is infallible.
884        for (permit, record) in permits.zip(records) {
885            let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
886            let format = record.metadata.format;
887            // Auto means the sender did not pin a format. Detect from the lead
888            // byte so the receiver gets a concrete hint; does NOT parse/decode.
889            let format = if format == PayloadFormat::Auto {
890                PayloadFormat::detect(&record.payload)
891            } else {
892                format
893            };
894
895            permit.send(Message {
896                key: record.key,
897                payload: record.payload,
898                token: GrpcToken::new(seq),
899                timestamp_ms: record.metadata.timestamp_ms,
900                format,
901            });
902        }
903
904        #[cfg(feature = "metrics")]
905        metrics::counter!(
906            "dfe_transport_sent_total",
907            "transport" => "grpc",
908            "path" => "batch"
909        )
910        .increment(accepted);
911
912        Ok(Response::new(proto::BatchAck { accepted }))
913    }
914
915    async fn health_check(
916        &self,
917        _request: Request<proto::HealthCheckRequest>,
918    ) -> Result<Response<proto::HealthCheckResponse>, Status> {
919        Ok(Response::new(proto::HealthCheckResponse {
920            status: proto::ServingStatus::Serving.into(),
921        }))
922    }
923}
924
925#[cfg(test)]
926mod tests {
927    use super::*;
928
929    #[test]
930    fn grpc_token_display() {
931        let token = GrpcToken::new(42);
932        assert_eq!(format!("{token}"), "grpc:42");
933
934        let token = GrpcToken::with_source(7, Arc::from("peer-1"));
935        assert_eq!(format!("{token}"), "grpc:peer-1:7");
936    }
937
938    #[test]
939    fn grpc_config_defaults() {
940        let config = GrpcConfig::default();
941        assert!(config.listen.is_none());
942        assert!(config.endpoint.is_none());
943        assert_eq!(config.recv_buffer_size, 10_000);
944        assert_eq!(config.recv_timeout_ms, 100);
945        assert_eq!(config.send_timeout_ms, 30_000);
946        assert_eq!(config.max_message_size, 16 * 1024 * 1024);
947        assert!(!config.compression);
948        assert!(!config.tls_enabled);
949        assert!(config.tls_ca_path.is_none());
950    }
951
952    #[test]
953    fn grpc_client_tls_builds_with_private_ca_and_rejects_half_mtls() {
954        use std::io::Write;
955        let cert = rcgen::generate_simple_self_signed(vec!["grpc.test".to_string()]).unwrap();
956        let mut ca = tempfile::NamedTempFile::new().unwrap();
957        ca.write_all(cert.cert.pem().as_bytes()).unwrap();
958        ca.flush().unwrap();
959
960        // Private CA + SNI -> builds.
961        let cfg = GrpcConfig {
962            endpoint: Some("https://peer:6000".to_string()),
963            tls_enabled: true,
964            tls_ca_path: Some(ca.path().to_string_lossy().into_owned()),
965            tls_domain: Some("grpc.test".to_string()),
966            ..Default::default()
967        };
968        assert!(build_grpc_client_tls(&cfg).is_ok());
969
970        // Half-configured mTLS (cert without key) -> error.
971        let cfg = GrpcConfig {
972            tls_enabled: true,
973            tls_client_cert_path: Some(ca.path().to_string_lossy().into_owned()),
974            tls_client_key_path: None,
975            ..Default::default()
976        };
977        assert!(build_grpc_client_tls(&cfg).is_err());
978    }
979
980    #[test]
981    fn grpc_config_server() {
982        let config = GrpcConfig::server("0.0.0.0:6000");
983        assert_eq!(config.listen.as_deref(), Some("0.0.0.0:6000"));
984        assert!(config.endpoint.is_none());
985    }
986
987    #[test]
988    fn grpc_config_client() {
989        let config = GrpcConfig::client("http://loader:6000");
990        assert!(config.listen.is_none());
991        assert_eq!(config.endpoint.as_deref(), Some("http://loader:6000"));
992    }
993
994    #[tokio::test]
995    async fn send_batch_rejects_oversize_block() {
996        // Over max_message_size, reject with a clear Fatal naming the limit
997        // BEFORE the RPC -- tonic would otherwise return an opaque OutOfRange
998        // (also Fatal) that can never succeed on retry. The size check fires
999        // before any connection, so no server is needed.
1000        let config = GrpcConfig::client("http://127.0.0.1:1").with_max_message_size(64);
1001        let transport = GrpcTransport::new(&config).await.unwrap();
1002        let rec = Record {
1003            payload: bytes::Bytes::from(vec![b'x'; 256]),
1004            key: None,
1005            headers: Vec::new(),
1006            metadata: crate::transport::work_batch::RecordMeta {
1007                timestamp_ms: None,
1008                format: PayloadFormat::Json,
1009            },
1010        };
1011        match transport.send_batch(&[rec]).await {
1012            SendResult::Fatal(e) => assert!(
1013                e.to_string().contains("max_message_size"),
1014                "error should name the limit, got: {e}"
1015            ),
1016            other => panic!("expected Fatal for oversize block, got {other:?}"),
1017        }
1018    }
1019
1020    #[test]
1021    fn grpc_config_with_compression() {
1022        let config = GrpcConfig::server("0.0.0.0:6000").with_compression();
1023        assert!(config.compression);
1024    }
1025
1026    #[tokio::test]
1027    async fn grpc_transport_client_only() {
1028        // Client-only transport (lazy connection, no server)
1029        let config = GrpcConfig::client("http://localhost:16000");
1030        let transport = GrpcTransport::new(&config).await.unwrap();
1031
1032        assert!(transport.client.is_some());
1033        assert!(transport.receiver.is_none());
1034        assert!(transport.is_healthy());
1035        assert_eq!(transport.name(), "grpc");
1036
1037        // recv should error (no server)
1038        let result = transport.recv(10).await;
1039        assert!(result.is_err());
1040
1041        // commit is always ok
1042        transport.commit(&[]).await.unwrap();
1043    }
1044
1045    /// G3: with a pressure governor pinned HIGH, the gRPC Push handler rejects
1046    /// with `Status::unavailable` (the gRPC analogue of 503). The default `new`
1047    /// (no governor) accepts as before.
1048    #[cfg(feature = "governor")]
1049    #[tokio::test]
1050    async fn grpc_pressure_high_rejects_unavailable() {
1051        use crate::governor::{Hysteresis, MemoryPressureSource, PressureSource, UnifiedPressure};
1052        use crate::memory::{MemoryGuard, MemoryGuardConfig};
1053
1054        let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1055            limit_bytes: 1000,
1056            pressure_threshold: 0.80,
1057            ..Default::default()
1058        }));
1059        guard.add_bytes(950); // 95%
1060        let pressure = Arc::new(UnifiedPressure::new(
1061            vec![Arc::new(MemoryPressureSource::new(Arc::clone(&guard))) as Arc<dyn PressureSource>],
1062            Hysteresis::new(0.80, 0.65).expect("valid band"),
1063        ));
1064        assert!(pressure.should_hold(), "pinned-high governor must hold");
1065
1066        // Server bound to the governor.
1067        let server_cfg = GrpcConfig::server("127.0.0.1:16077");
1068        let server = GrpcTransport::with_pressure(&server_cfg, Some(Arc::clone(&pressure)))
1069            .await
1070            .unwrap();
1071        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1072
1073        // Client pushes -> rejected as backpressure (maps to Backpressured).
1074        let client_cfg = GrpcConfig::client("http://127.0.0.1:16077");
1075        let client = GrpcTransport::new(&client_cfg).await.unwrap();
1076        let result = client
1077            .send("events", bytes::Bytes::from_static(b"{\"x\":1}"))
1078            .await;
1079        assert!(
1080            matches!(result, SendResult::Backpressured),
1081            "push under pressure must surface as backpressure, got {result:?}"
1082        );
1083
1084        client.close().await.unwrap();
1085        server.close().await.unwrap();
1086    }
1087
1088    #[tokio::test]
1089    async fn grpc_transport_server_only() {
1090        // Server-only transport (no client for sending)
1091        // Note: port 0 may not work with tonic parse, use a specific port
1092        let config = GrpcConfig::server("127.0.0.1:16001");
1093        let transport = GrpcTransport::new(&config).await.unwrap();
1094
1095        assert!(transport.client.is_none());
1096        assert!(transport.receiver.is_some());
1097        assert!(transport.is_healthy());
1098
1099        // send should error (no client)
1100        let result = transport
1101            .send("test", bytes::Bytes::from_static(b"payload"))
1102            .await;
1103        assert!(result.is_fatal());
1104
1105        // Close
1106        transport.close().await.unwrap();
1107        assert!(!transport.is_healthy());
1108    }
1109}