Skip to main content

zingo_netutils/
lib.rs

1//! `zingo-netutils`
2//!
3//! This crate provides the [`Indexer`] trait for communicating with a Zcash chain indexer,
4//! and [`GrpcIndexer`], a concrete implementation that connects to a zainod server via gRPC.
5
6use std::future::Future;
7use std::time::Duration;
8
9#[cfg(test)]
10use tokio_rustls::rustls::RootCertStore;
11use tonic::Request;
12use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
13use zcash_client_backend::proto::service::{
14    BlockId, ChainSpec, Empty, LightdInfo, RawTransaction, TreeState,
15    compact_tx_streamer_client::CompactTxStreamerClient,
16};
17
18#[derive(Debug, thiserror::Error)]
19pub enum GetClientError {
20    #[error("bad uri: invalid scheme")]
21    InvalidScheme,
22
23    #[error("bad uri: invalid authority")]
24    InvalidAuthority,
25
26    #[error("bad uri: invalid path and/or query")]
27    InvalidPathAndQuery,
28
29    #[error(transparent)]
30    Transport(#[from] tonic::transport::Error),
31
32    #[error("no uri: no connection")]
33    NoUri,
34}
35
36#[cfg(test)]
37fn load_test_cert_pem() -> Option<Vec<u8>> {
38    const TEST_PEMFILE_PATH: &str = "test-data/localhost.pem";
39    std::fs::read(TEST_PEMFILE_PATH).ok()
40}
41fn client_tls_config() -> Result<ClientTlsConfig, GetClientError> {
42    // Allow self-signed certs in tests
43    #[cfg(test)]
44    {
45        if let Some(pem) = load_test_cert_pem() {
46            return Ok(ClientTlsConfig::new()
47                .ca_certificate(tonic::transport::Certificate::from_pem(pem))
48                .with_webpki_roots());
49        }
50    }
51
52    Ok(ClientTlsConfig::new().with_webpki_roots())
53}
54
55const DEFAULT_GRPC_TIMEOUT: Duration = Duration::from_secs(10);
56
57/// Error type for [`GrpcIndexer`] operations.
58#[derive(Debug, thiserror::Error)]
59pub enum GrpcIndexerError {
60    #[error(transparent)]
61    Client(#[from] GetClientError),
62
63    #[error("gRPC error: {0}")]
64    Status(#[from] tonic::Status),
65
66    #[error("send rejected: {0}")]
67    SendRejected(String),
68}
69
70/// Trait for communicating with a Zcash chain indexer.
71pub trait Indexer {
72    type Error;
73
74    fn get_info(&self) -> impl Future<Output = Result<LightdInfo, Self::Error>>;
75    fn get_latest_block(&self) -> impl Future<Output = Result<BlockId, Self::Error>>;
76    fn send_transaction(
77        &self,
78        tx_bytes: Box<[u8]>,
79    ) -> impl Future<Output = Result<String, Self::Error>>;
80    fn get_trees(&self, height: u64) -> impl Future<Output = Result<TreeState, Self::Error>>;
81}
82
83/// gRPC-backed [`Indexer`] that connects to a lightwalletd server.
84#[derive(Clone, Debug)]
85pub struct GrpcIndexer {
86    uri: Option<http::Uri>,
87}
88
89impl GrpcIndexer {
90    pub fn new(uri: http::Uri) -> Self {
91        Self { uri: Some(uri) }
92    }
93
94    pub fn disconnected() -> Self {
95        Self { uri: None }
96    }
97
98    pub fn uri(&self) -> Option<&http::Uri> {
99        self.uri.as_ref()
100    }
101
102    pub fn set_uri(&mut self, uri: http::Uri) {
103        self.uri = Some(uri);
104    }
105
106    pub fn disconnect(&mut self) {
107        self.uri = None;
108    }
109
110    /// The connector, containing the URI to connect to.
111    /// This type is mostly an interface to the `get_client` method.
112    /// The proto-generated `CompactTxStreamerClient` type is the main
113    /// interface to actually communicating with a Zcash indexer.
114    /// Connect to the URI, and return a Client. For the full list of methods
115    /// the client supports, see the service.proto file (some of the types
116    /// are defined in the `compact_formats.proto` file).
117    pub async fn get_client(&self) -> Result<CompactTxStreamerClient<Channel>, GetClientError> {
118        let uri = self.uri.as_ref().ok_or(GetClientError::NoUri)?;
119        let scheme = uri.scheme_str().ok_or(GetClientError::InvalidScheme)?;
120        if scheme != "http" && scheme != "https" {
121            return Err(GetClientError::InvalidScheme);
122        }
123        let _authority = uri.authority().ok_or(GetClientError::InvalidAuthority)?;
124
125        let endpoint = Endpoint::from_shared(uri.to_string())?.tcp_nodelay(true);
126
127        let channel = if scheme == "https" {
128            let tls = client_tls_config()?;
129            endpoint.tls_config(tls)?.connect().await?
130        } else {
131            endpoint.connect().await?
132        };
133
134        Ok(CompactTxStreamerClient::new(channel))
135    }
136}
137
138impl Indexer for GrpcIndexer {
139    type Error = GrpcIndexerError;
140
141    async fn get_info(&self) -> Result<LightdInfo, GrpcIndexerError> {
142        let mut client = self.get_client().await?;
143        let mut request = Request::new(Empty {});
144        request.set_timeout(DEFAULT_GRPC_TIMEOUT);
145        let response = client.get_lightd_info(request).await?;
146        Ok(response.into_inner())
147    }
148
149    async fn get_latest_block(&self) -> Result<BlockId, GrpcIndexerError> {
150        let mut client = self.get_client().await?;
151        let mut request = Request::new(ChainSpec {});
152        request.set_timeout(DEFAULT_GRPC_TIMEOUT);
153        let response = client.get_latest_block(request).await?;
154        Ok(response.into_inner())
155    }
156
157    async fn send_transaction(&self, tx_bytes: Box<[u8]>) -> Result<String, GrpcIndexerError> {
158        let mut client = self.get_client().await?;
159        let mut request = Request::new(RawTransaction {
160            data: tx_bytes.to_vec(),
161            height: 0,
162        });
163        request.set_timeout(DEFAULT_GRPC_TIMEOUT);
164        let response = client.send_transaction(request).await?;
165        let sendresponse = response.into_inner();
166        if sendresponse.error_code == 0 {
167            let mut transaction_id = sendresponse.error_message;
168            if transaction_id.starts_with('\"') && transaction_id.ends_with('\"') {
169                transaction_id = transaction_id[1..transaction_id.len() - 1].to_string();
170            }
171            Ok(transaction_id)
172        } else {
173            Err(GrpcIndexerError::SendRejected(format!("{sendresponse:?}")))
174        }
175    }
176
177    async fn get_trees(&self, height: u64) -> Result<TreeState, GrpcIndexerError> {
178        let mut client = self.get_client().await?;
179        let response = client
180            .get_tree_state(Request::new(BlockId {
181                height,
182                hash: vec![],
183            }))
184            .await?;
185        Ok(response.into_inner())
186    }
187}
188
189#[cfg(test)]
190fn add_test_cert_to_roots(roots: &mut RootCertStore) {
191    use tonic::transport::CertificateDer;
192    eprintln!("Adding test cert to roots");
193
194    const TEST_PEMFILE_PATH: &str = "test-data/localhost.pem";
195
196    let Ok(fd) = std::fs::File::open(TEST_PEMFILE_PATH) else {
197        eprintln!("Test TLS cert not found at {TEST_PEMFILE_PATH}, skipping");
198        return;
199    };
200
201    let mut buf = std::io::BufReader::new(fd);
202    let certs_bytes: Vec<tonic::transport::CertificateDer> = rustls_pemfile::certs(&mut buf)
203        .filter_map(Result::ok)
204        .collect();
205
206    let certs: Vec<CertificateDer<'_>> = certs_bytes.into_iter().collect();
207    roots.add_parsable_certificates(certs);
208}
209
210#[cfg(test)]
211mod tests {
212    //! Unit and integration-style tests for `zingo-netutils`.
213    //!
214    //! These tests focus on:
215    //! - TLS test asset sanity (`test-data/localhost.pem` + `.key`)
216    //! - Rustls plumbing (adding a local cert to a root store)
217    //! - Connector correctness (scheme validation, HTTP/2 expectations)
218    //! - URI rewrite behavior (no panics; returns structured errors)
219    //!
220    //! Notes:
221    //! - Some tests spin up an in-process TLS server and use aggressive timeouts to
222    //!   avoid hangs under nextest.
223    //! - We explicitly install a rustls crypto provider to avoid
224    //!   provider-selection panics in test binaries.
225
226    use std::time::Duration;
227
228    use http::{Request, Response};
229    use hyper::{
230        body::{Bytes, Incoming},
231        service::service_fn,
232    };
233    use hyper_util::rt::TokioIo;
234    use tokio::{net::TcpListener, sync::oneshot, time::timeout};
235    use tokio_rustls::{TlsAcceptor, rustls};
236
237    use super::*;
238
239    /// Ensures the committed localhost test certificate exists and is parseable as X.509.
240    ///
241    /// This catches:
242    /// - missing file / wrong working directory assumptions
243    /// - invalid PEM encoding
244    /// - accidentally committing the wrong artifact (e.g., key instead of cert)
245    #[test]
246    fn localhost_cert_file_exists_and_is_parseable() {
247        const CERT_PATH: &str = "test-data/localhost.pem";
248
249        let pem = std::fs::read(CERT_PATH).expect("missing test-data/localhost.pem");
250
251        let mut cursor = std::io::BufReader::new(pem.as_slice());
252        let certs = rustls_pemfile::certs(&mut cursor)
253            .filter_map(Result::ok)
254            .collect::<Vec<_>>();
255
256        assert!(!certs.is_empty(), "no certs found in {CERT_PATH}");
257
258        for cert in certs {
259            let der = cert.as_ref();
260            let parsed = x509_parser::parse_x509_certificate(der);
261            assert!(
262                parsed.is_ok(),
263                "failed to parse a cert from {CERT_PATH} as X.509"
264            );
265        }
266    }
267
268    /// Guards against committing a CA certificate as the TLS server certificate.
269    ///
270    /// Rustls rejects certificates with CA constraints when used as an end-entity
271    /// server certificate (e.g. `CaUsedAsEndEntity`), even if the cert is in the
272    /// root store. This test ensures the committed localhost cert has `CA:FALSE`.
273    #[test]
274    fn localhost_cert_is_end_entity_not_ca() {
275        let pem =
276            std::fs::read("test-data/localhost.pem").expect("missing test-data/localhost.pem");
277        let mut cursor = std::io::BufReader::new(pem.as_slice());
278
279        let certs = rustls_pemfile::certs(&mut cursor)
280            .filter_map(Result::ok)
281            .collect::<Vec<_>>();
282
283        assert!(!certs.is_empty(), "no certs found in localhost.pem");
284
285        let der = certs[0].as_ref();
286        let parsed = x509_parser::parse_x509_certificate(der).expect("failed to parse X.509");
287        let x509 = parsed.1;
288
289        let constraints = x509
290            .basic_constraints()
291            .expect("missing basic constraints extension");
292
293        assert!(
294            !constraints.unwrap().value.ca,
295            "localhost.pem must be CA:FALSE"
296        );
297    }
298
299    /// Loads a rustls `ServerConfig` for a local TLS server using the committed
300    /// test certificate and private key.
301    ///
302    /// The cert/key pair is *test-only* and is stored under `test-data/`.
303    /// This is used to verify that the client-side root-store injection
304    /// (`add_test_cert_to_roots`) actually enables successful TLS handshakes.
305    fn load_test_server_config() -> std::sync::Arc<rustls::ServerConfig> {
306        let cert_pem =
307            std::fs::read("test-data/localhost.pem").expect("missing test-data/localhost.pem");
308        let key_pem =
309            std::fs::read("test-data/localhost.key").expect("missing test-data/localhost.key");
310
311        let mut cert_cursor = std::io::BufReader::new(cert_pem.as_slice());
312        let mut key_cursor = std::io::BufReader::new(key_pem.as_slice());
313
314        let certs = rustls_pemfile::certs(&mut cert_cursor)
315            .filter_map(Result::ok)
316            .map(rustls::pki_types::CertificateDer::from)
317            .collect::<Vec<_>>();
318
319        let key = rustls_pemfile::private_key(&mut key_cursor)
320            .expect("failed to read private key")
321            .expect("no private key found");
322
323        let config = rustls::ServerConfig::builder()
324            .with_no_client_auth()
325            .with_single_cert(certs, key)
326            .expect("bad cert or key");
327
328        std::sync::Arc::new(config)
329    }
330    /// Smoke test: adding the committed localhost cert to a rustls root store enables
331    /// a client to complete a TLS handshake and perform an HTTP request.
332    ///
333    /// Implementation notes:
334    /// - Uses a local TLS server with the committed cert/key.
335    /// - Uses strict timeouts to prevent hangs under nextest.
336    /// - Explicitly drains the request body and disables keep-alive so that
337    ///   `serve_connection` terminates deterministically.
338    /// - Installs the rustls crypto provider to avoid provider
339    ///   selection panics in test binaries.
340    #[tokio::test]
341    async fn add_test_cert_to_roots_enables_tls_handshake() {
342        use http_body_util::Full;
343        use hyper::service::service_fn;
344        use hyper_util::rt::TokioIo;
345        use tokio::net::TcpListener;
346        use tokio_rustls::TlsAcceptor;
347        use tokio_rustls::rustls;
348
349        let _ = rustls::crypto::ring::default_provider().install_default();
350
351        let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind failed");
352        let addr = listener.local_addr().expect("local_addr failed");
353
354        let tls_config = load_test_server_config();
355        let acceptor = TlsAcceptor::from(tls_config);
356
357        let ready = oneshot::channel::<()>();
358        let ready_tx = ready.0;
359        let ready_rx = ready.1;
360
361        let server_task = tokio::spawn(async move {
362            let _ = ready_tx.send(());
363
364            let accept_res = timeout(Duration::from_secs(3), listener.accept()).await;
365            let (socket, _) = accept_res
366                .expect("server accept timed out")
367                .expect("accept failed");
368
369            let tls_stream = timeout(Duration::from_secs(3), acceptor.accept(socket))
370                .await
371                .expect("tls accept timed out")
372                .expect("tls accept failed");
373
374            let io = TokioIo::new(tls_stream);
375
376            let svc = service_fn(|mut req: http::Request<hyper::body::Incoming>| async move {
377                use http_body_util::BodyExt;
378
379                while let Some(frame) = req.body_mut().frame().await {
380                    if frame.is_err() {
381                        break;
382                    }
383                }
384
385                let mut resp = http::Response::new(Full::new(Bytes::from_static(b"ok")));
386                resp.headers_mut().insert(
387                    http::header::CONNECTION,
388                    http::HeaderValue::from_static("close"),
389                );
390                Ok::<_, hyper::Error>(resp)
391            });
392
393            timeout(
394                Duration::from_secs(3),
395                hyper::server::conn::http1::Builder::new()
396                    .keep_alive(false)
397                    .serve_connection(io, svc),
398            )
399            .await
400            .expect("serve_connection timed out")
401            .expect("serve_connection failed");
402        });
403
404        let _ = timeout(Duration::from_secs(1), ready_rx)
405            .await
406            .expect("server ready signal timed out")
407            .expect("server dropped before ready");
408
409        // Build client root store and add the test cert.
410        let mut roots = rustls::RootCertStore::empty();
411        add_test_cert_to_roots(&mut roots);
412
413        let client_config = rustls::ClientConfig::builder()
414            .with_root_certificates(roots)
415            .with_no_client_auth();
416
417        // This MUST allow http1 since the server uses hyper http1 builder.
418        let https = hyper_rustls::HttpsConnectorBuilder::new()
419            .with_tls_config(client_config)
420            .https_only()
421            .enable_http1()
422            .build();
423
424        let client =
425            hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
426                .build(https);
427
428        let uri: http::Uri = format!("https://127.0.0.1:{}/", addr.port())
429            .parse()
430            .expect("bad uri");
431
432        let req = http::Request::builder()
433            .method("GET")
434            .uri(uri)
435            .body(Full::<Bytes>::new(Bytes::new()))
436            .expect("request build failed");
437
438        let res = timeout(Duration::from_secs(3), client.request(req))
439            .await
440            .expect("client request timed out")
441            .expect("TLS handshake or request failed");
442
443        assert!(res.status().is_success());
444
445        timeout(Duration::from_secs(3), server_task)
446            .await
447            .expect("server task timed out")
448            .expect("server task failed");
449    }
450
451    /// Validates that the connector rejects non-HTTP(S) URIs.
452    ///
453    /// This test is intended to fail until production code checks for:
454    /// - `http` and `https` schemes only
455    /// and rejects everything else (e.g. `ftp`).
456    #[tokio::test]
457    async fn rejects_non_http_schemes() {
458        let uri: http::Uri = "ftp://example.com:1234".parse().unwrap();
459        let res = GrpcIndexer::new(uri).get_client().await;
460
461        assert!(
462            res.is_err(),
463            "expected get_client() to reject non-http(s) schemes, but got Ok"
464        );
465    }
466
467    /// Demonstrates the HTTPS downgrade hazard: the underlying client can successfully
468    /// talk to an HTTP/1.1-only TLS server if the HTTPS branch does not enforce HTTP/2.
469    ///
470    /// This is intentionally written as a “should be HTTP/2” test so it fails until
471    /// the HTTPS client is constructed with `http2_only(true)`.
472    #[tokio::test]
473    async fn https_connector_must_not_downgrade_to_http1() {
474        use http_body_util::Full;
475
476        let _ = rustls::crypto::ring::default_provider().install_default();
477
478        let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind failed");
479        let addr = listener.local_addr().expect("local_addr failed");
480
481        let tls_config = load_test_server_config();
482        let acceptor = TlsAcceptor::from(tls_config);
483
484        let server_task = tokio::spawn(async move {
485            let accept_res = timeout(Duration::from_secs(3), listener.accept()).await;
486            let (socket, _) = accept_res
487                .expect("server accept timed out")
488                .expect("accept failed");
489
490            let tls_stream = acceptor.accept(socket).await.expect("tls accept failed");
491            let io = TokioIo::new(tls_stream);
492
493            let svc = service_fn(|_req: Request<Incoming>| async move {
494                Ok::<_, hyper::Error>(Response::new(Full::new(Bytes::from_static(b"ok"))))
495            });
496
497            // This may error with VersionH2 if the client sends an h2 preface, or it may
498            // simply never be reached if ALPN fails earlier. Either is fine for this test.
499            let _ = hyper::server::conn::http1::Builder::new()
500                .serve_connection(io, svc)
501                .await;
502        });
503
504        let base = format!("https://127.0.0.1:{}", addr.port());
505        let uri = base.parse::<http::Uri>().expect("bad base uri");
506
507        let endpoint = tonic::transport::Endpoint::from_shared(uri.to_string())
508            .expect("endpoint")
509            .tcp_nodelay(true);
510
511        let tls = client_tls_config().expect("tls config");
512        let connect_res = endpoint
513            .tls_config(tls)
514            .expect("tls_config failed")
515            .connect()
516            .await;
517
518        // A gRPC (HTTP/2) client must not succeed against an HTTP/1.1-only TLS server.
519        assert!(
520            connect_res.is_err(),
521            "expected connect to fail (no downgrade to HTTP/1.1), but it succeeded"
522        );
523
524        server_task.abort();
525    }
526
527    #[tokio::test]
528    async fn connects_to_public_mainnet_indexer_and_gets_info() {
529        use std::time::Duration;
530        use tokio::time::timeout;
531        use tonic::Request;
532        use zcash_client_backend::proto::service::Empty;
533
534        let _ = rustls::crypto::ring::default_provider().install_default();
535
536        let endpoint = "https://zec.rocks:443".to_string();
537
538        let uri: http::Uri = endpoint.parse().expect("bad mainnet indexer URI");
539
540        let mut client = timeout(Duration::from_secs(10), GrpcIndexer::new(uri).get_client())
541            .await
542            .expect("timed out connecting to public indexer")
543            .expect("failed to connect to public indexer");
544
545        let response = timeout(
546            Duration::from_secs(10),
547            client.get_lightd_info(Request::new(Empty {})),
548        )
549        .await
550        .expect("timed out calling GetLightdInfo")
551        .expect("GetLightdInfo RPC failed")
552        .into_inner();
553
554        assert!(
555            !response.chain_name.is_empty(),
556            "chain_name should not be empty"
557        );
558        assert!(
559            response.block_height > 0,
560            "block_height should be > 0, got {}",
561            response.block_height
562        );
563
564        let chain = response.chain_name.to_ascii_lowercase();
565        assert!(
566            chain.contains("main"),
567            "expected a mainnet server, got chain_name={:?}",
568            response.chain_name
569        );
570    }
571}