Skip to main content

hyperi_rustlib/transport/grpc/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/grpc/mod.rs
3// Purpose:   gRPC transport backend
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # gRPC Transport
10//!
11//! DFE native gRPC transport using tonic. Supports client mode (sending),
12//! server mode (receiving), or both.
13//!
14//! ## DFE Native Protocol
15//!
16//! Lightweight bulk bytes transfer via `dfe.transport.v1.DfeTransport/Push`.
17//! Payload is opaque bytes (JSON, MsgPack, or Arrow IPC) with a format hint.
18//!
19//! ## Vector Wire Protocol Compatibility (optional)
20//!
21//! When the `transport-grpc-vector-compat` feature is enabled and
22//! `GrpcConfig::vector_compat` is true, the server also accepts
23//! `vector.Vector/PushEvents` RPCs from legacy Vector sinks.
24//!
25//! ## Example
26//!
27//! ```rust,ignore
28//! use hyperi_rustlib::transport::{GrpcTransport, GrpcConfig, TransportReceiver};
29//!
30//! // Server mode (receive from remote senders)
31//! let config = GrpcConfig::server("0.0.0.0:6000");
32//! let transport = GrpcTransport::new(&config).await?;
33//!
34//! let messages = transport.recv(100).await?.messages;
35//! // commit is a no-op for gRPC (no persistence)
36//! transport.commit(&[]).await?;
37//! ```
38
39pub mod config;
40pub mod proto;
41pub mod token;
42
43pub use config::GrpcConfig;
44pub use token::GrpcToken;
45
46use super::error::{TransportError, TransportResult};
47use super::traits::{RecvBatch, TransportBase, TransportReceiver, TransportSender};
48use super::types::{Message, PayloadFormat, SendResult};
49use std::collections::HashMap;
50use std::sync::Arc;
51use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
52use tokio::sync::{mpsc, oneshot};
53use tonic::{Request, Response, Status};
54
55/// gRPC transport for DFE inter-service communication.
56///
57/// Implements both `TransportSender` and `TransportReceiver`, so it also
58/// satisfies the unified `Transport` trait via blanket impl.
59pub struct GrpcTransport {
60    /// Client for sending (None if server-only mode).
61    client: Option<proto::dfe_transport_client::DfeTransportClient<tonic::transport::Channel>>,
62
63    /// Receiver channel (None if client-only mode).
64    receiver: Option<tokio::sync::Mutex<mpsc::Receiver<Message<GrpcToken>>>>,
65
66    /// Shutdown signal for the server task. Behind a `Mutex<Option<..>>` so
67    /// `close(&self)` (not just `Drop`) can take and fire it.
68    shutdown_tx: parking_lot::Mutex<Option<oneshot::Sender<()>>>,
69
70    /// Server background task handle (kept alive, aborted on drop).
71    _server_handle: Option<tokio::task::JoinHandle<Result<(), tonic::transport::Error>>>,
72
73    /// Whether the transport is closed.
74    closed: AtomicBool,
75
76    /// Shared healthy flag -- read by health registry closure, written by close().
77    healthy: Arc<AtomicBool>,
78
79    /// Receive timeout (milliseconds).
80    recv_timeout_ms: u64,
81
82    /// Per-RPC send deadline (milliseconds, 0 = none).
83    send_timeout_ms: u64,
84
85    /// In-flight send count (for metrics).
86    #[cfg(feature = "metrics")]
87    inflight: AtomicU64,
88
89    /// Transport-level message filter engine.
90    filter_engine: super::filter::TransportFilterEngine,
91}
92
93/// Build a tonic `ClientTlsConfig` from the unified TLS fields on `GrpcConfig`.
94///
95/// tonic owns its TLS stack (like librdkafka), so this maps the unified
96/// `TlsTrust` vocabulary onto `ClientTlsConfig`: private-CA PEM (else OS native
97/// roots), optional SNI domain override, and optional mTLS client identity.
98fn build_grpc_client_tls(
99    config: &GrpcConfig,
100) -> TransportResult<tonic::transport::ClientTlsConfig> {
101    use tonic::transport::{Certificate, ClientTlsConfig, Identity};
102
103    let mut tls = ClientTlsConfig::new();
104
105    if let Some(ref ca) = config.tls_ca_path {
106        let pem = std::fs::read(ca)
107            .map_err(|e| TransportError::Config(format!("gRPC TLS: cannot read ca {ca}: {e}")))?;
108        tls = tls.ca_certificate(Certificate::from_pem(pem));
109    } else {
110        // No private CA -> trust the OS native roots.
111        tls = tls.with_native_roots();
112    }
113
114    if let Some(ref domain) = config.tls_domain {
115        tls = tls.domain_name(domain.clone());
116    }
117
118    // mTLS identity -- both cert and key, or neither.
119    match (&config.tls_client_cert_path, &config.tls_client_key_path) {
120        (Some(cert), Some(key)) => {
121            let cert_pem = std::fs::read(cert).map_err(|e| {
122                TransportError::Config(format!("gRPC TLS: cannot read client cert {cert}: {e}"))
123            })?;
124            let key_pem = std::fs::read(key).map_err(|e| {
125                TransportError::Config(format!("gRPC TLS: cannot read client key {key}: {e}"))
126            })?;
127            tls = tls.identity(Identity::from_pem(cert_pem, key_pem));
128        }
129        (None, None) => {}
130        _ => {
131            return Err(TransportError::Config(
132                "gRPC TLS: mTLS requires BOTH tls_client_cert_path and tls_client_key_path"
133                    .to_string(),
134            ));
135        }
136    }
137
138    Ok(tls)
139}
140
141impl GrpcTransport {
142    /// Create a new gRPC transport.
143    ///
144    /// # Configuration
145    ///
146    /// - Set `config.listen` to start a gRPC server (receive mode).
147    /// - Set `config.endpoint` to connect to a remote server (send mode).
148    /// - Set both for bidirectional communication.
149    ///
150    /// # Errors
151    ///
152    /// Returns error if the listen address is invalid or the server fails to start.
153    pub async fn new(config: &GrpcConfig) -> TransportResult<Self> {
154        let mut client = None;
155        let mut receiver = None;
156        let mut shutdown_tx = None;
157        let mut server_handle = None;
158        let sequence = Arc::new(AtomicU64::new(0));
159
160        // Set up client (lazy connection -- doesn't fail until first RPC)
161        if let Some(endpoint) = &config.endpoint {
162            let mut ep = tonic::transport::Channel::from_shared(endpoint.clone())
163                .map_err(|e| TransportError::Config(format!("invalid endpoint: {e}")))?;
164
165            // Client TLS. tonic owns its TLS stack, so we map the unified
166            // vocabulary onto ClientTlsConfig (private CA, mTLS identity, SNI).
167            if config.tls_enabled {
168                ep = ep
169                    .tls_config(build_grpc_client_tls(config)?)
170                    .map_err(|e| TransportError::Config(format!("gRPC TLS config: {e}")))?;
171            }
172
173            let channel = ep.connect_lazy();
174
175            let mut c = proto::dfe_transport_client::DfeTransportClient::new(channel)
176                .max_decoding_message_size(config.max_message_size)
177                .max_encoding_message_size(config.max_message_size);
178
179            if config.compression {
180                c = c
181                    .send_compressed(tonic::codec::CompressionEncoding::Gzip)
182                    .accept_compressed(tonic::codec::CompressionEncoding::Gzip);
183            }
184
185            client = Some(c);
186        }
187
188        // Set up server
189        if let Some(listen) = &config.listen {
190            let addr: std::net::SocketAddr = listen
191                .parse()
192                .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
193
194            let (tx, rx) = mpsc::channel(config.recv_buffer_size);
195            let (sd_tx, sd_rx) = oneshot::channel();
196
197            // DFE native service
198            let dfe_svc = DfeTransportServiceImpl {
199                sender: tx.clone(),
200                sequence: sequence.clone(),
201            };
202
203            let dfe_server = proto::dfe_transport_server::DfeTransportServer::new(dfe_svc)
204                .max_decoding_message_size(config.max_message_size)
205                .max_encoding_message_size(config.max_message_size)
206                .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
207                .send_compressed(tonic::codec::CompressionEncoding::Gzip);
208
209            // Build server with optional Vector compat
210            let mut builder = tonic::transport::Server::builder();
211
212            #[cfg(feature = "transport-grpc-vector-compat")]
213            let router = if config.vector_compat {
214                let vector_svc =
215                    super::vector_compat::source::VectorCompatService::new(tx, sequence.clone());
216                let vector_server =
217                    super::vector_compat::proto::vector::vector_server::VectorServer::new(
218                        vector_svc,
219                    )
220                    .max_decoding_message_size(config.max_message_size)
221                    .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
222                    .send_compressed(tonic::codec::CompressionEncoding::Gzip);
223
224                builder.add_service(dfe_server).add_service(vector_server)
225            } else {
226                builder.add_service(dfe_server)
227            };
228
229            #[cfg(not(feature = "transport-grpc-vector-compat"))]
230            let router = builder.add_service(dfe_server);
231
232            // Bind the listener synchronously BEFORE spawning the serve task.
233            // Once `TcpListener::bind` returns the OS socket is listening and
234            // queues incoming connections, so `new()` returning is a true
235            // readiness signal -- callers (and their tests) can connect
236            // immediately with no polling. `serve_with_shutdown(addr, ..)`
237            // bound inside the spawned task, which made `new()` return before
238            // the socket existed and forced every consumer to poll the port.
239            let listener = tokio::net::TcpListener::bind(addr)
240                .await
241                .map_err(|e| TransportError::Config(format!("failed to bind {addr}: {e}")))?;
242            let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
243
244            let handle = tokio::spawn(async move {
245                router
246                    .serve_with_incoming_shutdown(incoming, async {
247                        sd_rx.await.ok();
248                    })
249                    .await
250            });
251
252            receiver = Some(tokio::sync::Mutex::new(rx));
253            shutdown_tx = Some(sd_tx);
254            server_handle = Some(handle);
255        }
256
257        let healthy = Arc::new(AtomicBool::new(true));
258
259        let filter_engine = super::filter::TransportFilterEngine::new(
260            &config.filters_in,
261            &config.filters_out,
262            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
263        )?;
264
265        #[cfg(feature = "health")]
266        {
267            let h = Arc::clone(&healthy);
268            crate::health::HealthRegistry::register("transport:grpc", move || {
269                if h.load(Ordering::Relaxed) {
270                    crate::health::HealthStatus::Healthy
271                } else {
272                    crate::health::HealthStatus::Unhealthy
273                }
274            });
275        }
276
277        Ok(Self {
278            client,
279            receiver,
280            shutdown_tx: parking_lot::Mutex::new(shutdown_tx),
281            _server_handle: server_handle,
282            closed: AtomicBool::new(false),
283            healthy,
284            recv_timeout_ms: config.recv_timeout_ms,
285            send_timeout_ms: config.send_timeout_ms,
286            #[cfg(feature = "metrics")]
287            inflight: AtomicU64::new(0),
288            filter_engine,
289        })
290    }
291}
292
293impl TransportBase for GrpcTransport {
294    async fn close(&self) -> TransportResult<()> {
295        self.closed.store(true, Ordering::Relaxed);
296        self.healthy.store(false, Ordering::Relaxed);
297
298        // Actually stop the server: fire the shutdown oneshot so
299        // serve_with_incoming_shutdown completes and the listener is freed.
300        // Idempotent -- a second close() (or Drop) finds None.
301        if let Some(tx) = self.shutdown_tx.lock().take() {
302            let _ = tx.send(());
303        }
304        Ok(())
305    }
306
307    fn is_healthy(&self) -> bool {
308        let healthy = self.healthy.load(Ordering::Relaxed);
309        #[cfg(feature = "metrics")]
310        metrics::gauge!("dfe_transport_healthy", "transport" => "grpc").set(if healthy {
311            1.0
312        } else {
313            0.0
314        });
315        healthy
316    }
317
318    fn name(&self) -> &'static str {
319        "grpc"
320    }
321}
322
323impl TransportSender for GrpcTransport {
324    async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
325        if self.closed.load(Ordering::Relaxed) {
326            return SendResult::Fatal(TransportError::Closed);
327        }
328
329        // Outbound filter check
330        if self.filter_engine.has_outbound_filters() {
331            match self.filter_engine.apply_outbound(&payload) {
332                super::filter::FilterDisposition::Pass => {}
333                super::filter::FilterDisposition::Drop => return SendResult::Ok,
334                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
335            }
336        }
337
338        let Some(client) = &self.client else {
339            return SendResult::Fatal(TransportError::Config(
340                "no endpoint configured for sending".into(),
341            ));
342        };
343
344        let mut metadata = HashMap::new();
345        if !key.is_empty() {
346            metadata.insert("topic".to_string(), key.to_string());
347        }
348
349        // Inject W3C traceparent into gRPC metadata for distributed tracing
350        #[cfg(feature = "transport-trace")]
351        if let Some(tp) = super::propagation::current_traceparent() {
352            metadata.insert(super::propagation::TRACEPARENT_HEADER.to_string(), tp);
353        }
354
355        let mut request = tonic::Request::new(proto::PushRequest {
356            payload: payload.to_vec(),
357            format: proto::Format::Auto.into(),
358            metadata,
359        });
360
361        // Bound the RPC so a hung/black-holing server cannot wedge the sender
362        // task forever. Sent as the grpc-timeout header; the server aborts and
363        // the client surfaces Code::DeadlineExceeded when it elapses.
364        if self.send_timeout_ms > 0 {
365            request.set_timeout(std::time::Duration::from_millis(self.send_timeout_ms));
366        }
367
368        #[cfg(feature = "metrics")]
369        let start = std::time::Instant::now();
370
371        #[cfg(feature = "metrics")]
372        self.inflight.fetch_add(1, Ordering::Relaxed);
373
374        // tonic clients are cheaply cloneable (shared channel)
375        let result = match client.clone().push(request).await {
376            Ok(_) => {
377                #[cfg(feature = "metrics")]
378                metrics::counter!("dfe_transport_sent_total", "transport" => "grpc").increment(1);
379                SendResult::Ok
380            }
381            Err(status) => match status.code() {
382                // DeadlineExceeded = our send_timeout_ms fired (slow/hung server).
383                // Transient -- treat as backpressure so the caller retries rather
384                // than dropping the message.
385                tonic::Code::Unavailable
386                | tonic::Code::ResourceExhausted
387                | tonic::Code::DeadlineExceeded => {
388                    #[cfg(feature = "metrics")]
389                    metrics::counter!(
390                        "dfe_transport_backpressured_total",
391                        "transport" => "grpc"
392                    )
393                    .increment(1);
394                    SendResult::Backpressured
395                }
396                _ => {
397                    #[cfg(feature = "metrics")]
398                    metrics::counter!(
399                        "dfe_transport_send_errors_total",
400                        "transport" => "grpc"
401                    )
402                    .increment(1);
403                    SendResult::Fatal(TransportError::Send(status.message().to_string()))
404                }
405            },
406        };
407
408        #[cfg(feature = "metrics")]
409        {
410            self.inflight.fetch_sub(1, Ordering::Relaxed);
411            metrics::gauge!("dfe_transport_inflight", "transport" => "grpc")
412                .set(self.inflight.load(Ordering::Relaxed) as f64);
413            metrics::histogram!(
414                "dfe_transport_send_duration_seconds",
415                "transport" => "grpc"
416            )
417            .record(start.elapsed().as_secs_f64());
418        }
419
420        result
421    }
422}
423
424impl TransportReceiver for GrpcTransport {
425    type Token = GrpcToken;
426
427    async fn recv(&self, max: usize) -> TransportResult<RecvBatch<Self::Token>> {
428        if self.closed.load(Ordering::Relaxed) {
429            return Err(TransportError::Closed);
430        }
431
432        let Some(receiver) = &self.receiver else {
433            return Err(TransportError::Config(
434                "no listen address configured for receiving".into(),
435            ));
436        };
437
438        let mut rx = receiver.lock().await;
439        let mut messages = Vec::with_capacity(max.min(100));
440
441        for _ in 0..max {
442            let result = if self.recv_timeout_ms == 0 {
443                // Non-blocking
444                match rx.try_recv() {
445                    Ok(msg) => Some(msg),
446                    Err(mpsc::error::TryRecvError::Empty) => break,
447                    Err(mpsc::error::TryRecvError::Disconnected) => {
448                        return Err(TransportError::Closed);
449                    }
450                }
451            } else if messages.is_empty() {
452                // First message: wait with timeout
453                match tokio::time::timeout(
454                    std::time::Duration::from_millis(self.recv_timeout_ms),
455                    rx.recv(),
456                )
457                .await
458                {
459                    Ok(Some(msg)) => Some(msg),
460                    Ok(None) => return Err(TransportError::Closed),
461                    Err(_) => break, // Timeout
462                }
463            } else {
464                // Subsequent: non-blocking drain
465                match rx.try_recv() {
466                    Ok(msg) => Some(msg),
467                    Err(_) => break,
468                }
469            };
470
471            if let Some(msg) = result {
472                messages.push(msg);
473            }
474        }
475
476        // Apply inbound filters via the shared partition helper; DLQ entries
477        // are returned in the RecvBatch for the caller to route onward.
478        let batch = self.filter_engine.partition_batch(
479            messages,
480            |m| m.payload.as_slice(),
481            |m| m.key.clone(),
482        );
483        let messages = batch.messages;
484        let dlq_entries = batch.dlq_entries;
485
486        Ok(RecvBatch {
487            messages,
488            dlq_entries,
489        })
490    }
491
492    async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
493        // gRPC has no broker-side persistence -- commit is a no-op.
494        // Acknowledgement is implicit in the Push RPC response.
495        Ok(())
496    }
497}
498
499impl Drop for GrpcTransport {
500    fn drop(&mut self) {
501        // Fire the shutdown signal if close() didn't already (idempotent).
502        if let Some(tx) = self.shutdown_tx.lock().take() {
503            let _ = tx.send(());
504        }
505        // Server handle will be dropped, which aborts the task
506    }
507}
508
509// --- DFE Transport gRPC service implementation ---
510
511/// Internal service implementation that receives Push RPCs
512/// and forwards messages into the transport's mpsc channel.
513struct DfeTransportServiceImpl {
514    sender: mpsc::Sender<Message<GrpcToken>>,
515    sequence: Arc<AtomicU64>,
516}
517
518#[tonic::async_trait]
519impl proto::dfe_transport_server::DfeTransport for DfeTransportServiceImpl {
520    async fn push(
521        &self,
522        request: Request<proto::PushRequest>,
523    ) -> Result<Response<proto::PushResponse>, Status> {
524        let req = request.into_inner();
525        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
526
527        // Extract W3C traceparent from incoming gRPC metadata for distributed tracing
528        #[cfg(feature = "transport-trace")]
529        if let Some(tp) = req.metadata.get(super::propagation::TRACEPARENT_HEADER)
530            && super::propagation::is_valid_traceparent(tp)
531        {
532            tracing::Span::current().record("traceparent", tp.as_str());
533        }
534
535        let format = PayloadFormat::detect(&req.payload);
536        let key = req.metadata.get("topic").map(|s| Arc::from(s.as_str()));
537
538        let msg = Message {
539            key,
540            payload: req.payload,
541            token: GrpcToken::new(seq),
542            timestamp_ms: None,
543            format,
544        };
545
546        match self.sender.try_send(msg) {
547            Ok(()) => {
548                #[cfg(feature = "metrics")]
549                {
550                    metrics::counter!("dfe_transport_sent_total", "transport" => "grpc")
551                        .increment(1);
552                    metrics::gauge!("dfe_transport_queue_size", "transport" => "grpc").set(
553                        self.sender
554                            .max_capacity()
555                            .saturating_sub(self.sender.capacity()) as f64,
556                    );
557                }
558                Ok(Response::new(proto::PushResponse { accepted: 1 }))
559            }
560            Err(mpsc::error::TrySendError::Full(_)) => {
561                #[cfg(feature = "metrics")]
562                metrics::counter!(
563                    "dfe_transport_backpressured_total",
564                    "transport" => "grpc"
565                )
566                .increment(1);
567                Err(Status::resource_exhausted("receiver buffer full"))
568            }
569            Err(mpsc::error::TrySendError::Closed(_)) => {
570                #[cfg(feature = "metrics")]
571                metrics::counter!(
572                    "dfe_transport_refused_total",
573                    "transport" => "grpc"
574                )
575                .increment(1);
576                Err(Status::unavailable("receiver closed"))
577            }
578        }
579    }
580
581    async fn health_check(
582        &self,
583        _request: Request<proto::HealthCheckRequest>,
584    ) -> Result<Response<proto::HealthCheckResponse>, Status> {
585        Ok(Response::new(proto::HealthCheckResponse {
586            status: proto::ServingStatus::Serving.into(),
587        }))
588    }
589}
590
591#[cfg(test)]
592mod tests {
593    use super::*;
594
595    #[test]
596    fn grpc_token_display() {
597        let token = GrpcToken::new(42);
598        assert_eq!(format!("{token}"), "grpc:42");
599
600        let token = GrpcToken::with_source(7, Arc::from("peer-1"));
601        assert_eq!(format!("{token}"), "grpc:peer-1:7");
602    }
603
604    #[test]
605    fn grpc_config_defaults() {
606        let config = GrpcConfig::default();
607        assert!(config.listen.is_none());
608        assert!(config.endpoint.is_none());
609        assert_eq!(config.recv_buffer_size, 10_000);
610        assert_eq!(config.recv_timeout_ms, 100);
611        assert_eq!(config.send_timeout_ms, 30_000);
612        assert_eq!(config.max_message_size, 16 * 1024 * 1024);
613        assert!(!config.compression);
614        assert!(!config.tls_enabled);
615        assert!(config.tls_ca_path.is_none());
616    }
617
618    #[test]
619    fn grpc_client_tls_builds_with_private_ca_and_rejects_half_mtls() {
620        use std::io::Write;
621        let cert = rcgen::generate_simple_self_signed(vec!["grpc.test".to_string()]).unwrap();
622        let mut ca = tempfile::NamedTempFile::new().unwrap();
623        ca.write_all(cert.cert.pem().as_bytes()).unwrap();
624        ca.flush().unwrap();
625
626        // Private CA + SNI -> builds.
627        let cfg = GrpcConfig {
628            endpoint: Some("https://peer:6000".to_string()),
629            tls_enabled: true,
630            tls_ca_path: Some(ca.path().to_string_lossy().into_owned()),
631            tls_domain: Some("grpc.test".to_string()),
632            ..Default::default()
633        };
634        assert!(build_grpc_client_tls(&cfg).is_ok());
635
636        // Half-configured mTLS (cert without key) -> error.
637        let cfg = GrpcConfig {
638            tls_enabled: true,
639            tls_client_cert_path: Some(ca.path().to_string_lossy().into_owned()),
640            tls_client_key_path: None,
641            ..Default::default()
642        };
643        assert!(build_grpc_client_tls(&cfg).is_err());
644    }
645
646    #[test]
647    fn grpc_config_server() {
648        let config = GrpcConfig::server("0.0.0.0:6000");
649        assert_eq!(config.listen.as_deref(), Some("0.0.0.0:6000"));
650        assert!(config.endpoint.is_none());
651    }
652
653    #[test]
654    fn grpc_config_client() {
655        let config = GrpcConfig::client("http://loader:6000");
656        assert!(config.listen.is_none());
657        assert_eq!(config.endpoint.as_deref(), Some("http://loader:6000"));
658    }
659
660    #[test]
661    fn grpc_config_with_compression() {
662        let config = GrpcConfig::server("0.0.0.0:6000").with_compression();
663        assert!(config.compression);
664    }
665
666    #[tokio::test]
667    async fn grpc_transport_client_only() {
668        // Client-only transport (lazy connection, no server)
669        let config = GrpcConfig::client("http://localhost:16000");
670        let transport = GrpcTransport::new(&config).await.unwrap();
671
672        assert!(transport.client.is_some());
673        assert!(transport.receiver.is_none());
674        assert!(transport.is_healthy());
675        assert_eq!(transport.name(), "grpc");
676
677        // recv should error (no server)
678        let result = transport.recv(10).await;
679        assert!(result.is_err());
680
681        // commit is always ok
682        transport.commit(&[]).await.unwrap();
683    }
684
685    #[tokio::test]
686    async fn grpc_transport_server_only() {
687        // Server-only transport (no client for sending)
688        // Note: port 0 may not work with tonic parse, use a specific port
689        let config = GrpcConfig::server("127.0.0.1:16001");
690        let transport = GrpcTransport::new(&config).await.unwrap();
691
692        assert!(transport.client.is_none());
693        assert!(transport.receiver.is_some());
694        assert!(transport.is_healthy());
695
696        // send should error (no client)
697        let result = transport
698            .send("test", bytes::Bytes::from_static(b"payload"))
699            .await;
700        assert!(result.is_fatal());
701
702        // Close
703        transport.close().await.unwrap();
704        assert!(!transport.is_healthy());
705    }
706}