1use std::future::Future;
7use std::time::Duration;
8
9#[cfg(test)]
10use tokio_rustls::rustls::RootCertStore;
11use tonic::Request;
12use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
13use zcash_client_backend::proto::service::{
14 BlockId, ChainSpec, Empty, LightdInfo, RawTransaction, TreeState,
15 compact_tx_streamer_client::CompactTxStreamerClient,
16};
17
18#[derive(Debug, thiserror::Error)]
19pub enum GetClientError {
20 #[error("bad uri: invalid scheme")]
21 InvalidScheme,
22
23 #[error("bad uri: invalid authority")]
24 InvalidAuthority,
25
26 #[error("bad uri: invalid path and/or query")]
27 InvalidPathAndQuery,
28
29 #[error(transparent)]
30 Transport(#[from] tonic::transport::Error),
31
32 #[error("no uri: no connection")]
33 NoUri,
34}
35
36#[cfg(test)]
37fn load_test_cert_pem() -> Option<Vec<u8>> {
38 const TEST_PEMFILE_PATH: &str = "test-data/localhost.pem";
39 std::fs::read(TEST_PEMFILE_PATH).ok()
40}
41fn client_tls_config() -> Result<ClientTlsConfig, GetClientError> {
42 #[cfg(test)]
44 {
45 if let Some(pem) = load_test_cert_pem() {
46 return Ok(ClientTlsConfig::new()
47 .ca_certificate(tonic::transport::Certificate::from_pem(pem))
48 .with_webpki_roots());
49 }
50 }
51
52 Ok(ClientTlsConfig::new().with_webpki_roots())
53}
54
55const DEFAULT_GRPC_TIMEOUT: Duration = Duration::from_secs(10);
56
57#[derive(Debug, thiserror::Error)]
59pub enum GrpcIndexerError {
60 #[error(transparent)]
61 Client(#[from] GetClientError),
62
63 #[error("gRPC error: {0}")]
64 Status(#[from] tonic::Status),
65
66 #[error("send rejected: {0}")]
67 SendRejected(String),
68}
69
70pub trait Indexer {
72 type Error;
73
74 fn get_info(&self) -> impl Future<Output = Result<LightdInfo, Self::Error>>;
75 fn get_latest_block(&self) -> impl Future<Output = Result<BlockId, Self::Error>>;
76 fn send_transaction(
77 &self,
78 tx_bytes: Box<[u8]>,
79 ) -> impl Future<Output = Result<String, Self::Error>>;
80 fn get_trees(&self, height: u64) -> impl Future<Output = Result<TreeState, Self::Error>>;
81}
82
83#[derive(Clone, Debug)]
85pub struct GrpcIndexer {
86 uri: Option<http::Uri>,
87}
88
89impl GrpcIndexer {
90 pub fn new(uri: http::Uri) -> Self {
91 Self { uri: Some(uri) }
92 }
93
94 pub fn disconnected() -> Self {
95 Self { uri: None }
96 }
97
98 pub fn uri(&self) -> Option<&http::Uri> {
99 self.uri.as_ref()
100 }
101
102 pub fn set_uri(&mut self, uri: http::Uri) {
103 self.uri = Some(uri);
104 }
105
106 pub fn disconnect(&mut self) {
107 self.uri = None;
108 }
109
110 pub async fn get_client(&self) -> Result<CompactTxStreamerClient<Channel>, GetClientError> {
118 let uri = self.uri.as_ref().ok_or(GetClientError::NoUri)?;
119 let scheme = uri.scheme_str().ok_or(GetClientError::InvalidScheme)?;
120 if scheme != "http" && scheme != "https" {
121 return Err(GetClientError::InvalidScheme);
122 }
123 let _authority = uri.authority().ok_or(GetClientError::InvalidAuthority)?;
124
125 let endpoint = Endpoint::from_shared(uri.to_string())?.tcp_nodelay(true);
126
127 let channel = if scheme == "https" {
128 let tls = client_tls_config()?;
129 endpoint.tls_config(tls)?.connect().await?
130 } else {
131 endpoint.connect().await?
132 };
133
134 Ok(CompactTxStreamerClient::new(channel))
135 }
136}
137
138impl Indexer for GrpcIndexer {
139 type Error = GrpcIndexerError;
140
141 async fn get_info(&self) -> Result<LightdInfo, GrpcIndexerError> {
142 let mut client = self.get_client().await?;
143 let mut request = Request::new(Empty {});
144 request.set_timeout(DEFAULT_GRPC_TIMEOUT);
145 let response = client.get_lightd_info(request).await?;
146 Ok(response.into_inner())
147 }
148
149 async fn get_latest_block(&self) -> Result<BlockId, GrpcIndexerError> {
150 let mut client = self.get_client().await?;
151 let mut request = Request::new(ChainSpec {});
152 request.set_timeout(DEFAULT_GRPC_TIMEOUT);
153 let response = client.get_latest_block(request).await?;
154 Ok(response.into_inner())
155 }
156
157 async fn send_transaction(&self, tx_bytes: Box<[u8]>) -> Result<String, GrpcIndexerError> {
158 let mut client = self.get_client().await?;
159 let mut request = Request::new(RawTransaction {
160 data: tx_bytes.to_vec(),
161 height: 0,
162 });
163 request.set_timeout(DEFAULT_GRPC_TIMEOUT);
164 let response = client.send_transaction(request).await?;
165 let sendresponse = response.into_inner();
166 if sendresponse.error_code == 0 {
167 let mut transaction_id = sendresponse.error_message;
168 if transaction_id.starts_with('\"') && transaction_id.ends_with('\"') {
169 transaction_id = transaction_id[1..transaction_id.len() - 1].to_string();
170 }
171 Ok(transaction_id)
172 } else {
173 Err(GrpcIndexerError::SendRejected(format!("{sendresponse:?}")))
174 }
175 }
176
177 async fn get_trees(&self, height: u64) -> Result<TreeState, GrpcIndexerError> {
178 let mut client = self.get_client().await?;
179 let response = client
180 .get_tree_state(Request::new(BlockId {
181 height,
182 hash: vec![],
183 }))
184 .await?;
185 Ok(response.into_inner())
186 }
187}
188
189#[cfg(test)]
190fn add_test_cert_to_roots(roots: &mut RootCertStore) {
191 use tonic::transport::CertificateDer;
192 eprintln!("Adding test cert to roots");
193
194 const TEST_PEMFILE_PATH: &str = "test-data/localhost.pem";
195
196 let Ok(fd) = std::fs::File::open(TEST_PEMFILE_PATH) else {
197 eprintln!("Test TLS cert not found at {TEST_PEMFILE_PATH}, skipping");
198 return;
199 };
200
201 let mut buf = std::io::BufReader::new(fd);
202 let certs_bytes: Vec<tonic::transport::CertificateDer> = rustls_pemfile::certs(&mut buf)
203 .filter_map(Result::ok)
204 .collect();
205
206 let certs: Vec<CertificateDer<'_>> = certs_bytes.into_iter().collect();
207 roots.add_parsable_certificates(certs);
208}
209
210#[cfg(test)]
211mod tests {
212 use std::time::Duration;
227
228 use http::{Request, Response};
229 use hyper::{
230 body::{Bytes, Incoming},
231 service::service_fn,
232 };
233 use hyper_util::rt::TokioIo;
234 use tokio::{net::TcpListener, sync::oneshot, time::timeout};
235 use tokio_rustls::{TlsAcceptor, rustls};
236
237 use super::*;
238
239 #[test]
246 fn localhost_cert_file_exists_and_is_parseable() {
247 const CERT_PATH: &str = "test-data/localhost.pem";
248
249 let pem = std::fs::read(CERT_PATH).expect("missing test-data/localhost.pem");
250
251 let mut cursor = std::io::BufReader::new(pem.as_slice());
252 let certs = rustls_pemfile::certs(&mut cursor)
253 .filter_map(Result::ok)
254 .collect::<Vec<_>>();
255
256 assert!(!certs.is_empty(), "no certs found in {CERT_PATH}");
257
258 for cert in certs {
259 let der = cert.as_ref();
260 let parsed = x509_parser::parse_x509_certificate(der);
261 assert!(
262 parsed.is_ok(),
263 "failed to parse a cert from {CERT_PATH} as X.509"
264 );
265 }
266 }
267
268 #[test]
274 fn localhost_cert_is_end_entity_not_ca() {
275 let pem =
276 std::fs::read("test-data/localhost.pem").expect("missing test-data/localhost.pem");
277 let mut cursor = std::io::BufReader::new(pem.as_slice());
278
279 let certs = rustls_pemfile::certs(&mut cursor)
280 .filter_map(Result::ok)
281 .collect::<Vec<_>>();
282
283 assert!(!certs.is_empty(), "no certs found in localhost.pem");
284
285 let der = certs[0].as_ref();
286 let parsed = x509_parser::parse_x509_certificate(der).expect("failed to parse X.509");
287 let x509 = parsed.1;
288
289 let constraints = x509
290 .basic_constraints()
291 .expect("missing basic constraints extension");
292
293 assert!(
294 !constraints.unwrap().value.ca,
295 "localhost.pem must be CA:FALSE"
296 );
297 }
298
299 fn load_test_server_config() -> std::sync::Arc<rustls::ServerConfig> {
306 let cert_pem =
307 std::fs::read("test-data/localhost.pem").expect("missing test-data/localhost.pem");
308 let key_pem =
309 std::fs::read("test-data/localhost.key").expect("missing test-data/localhost.key");
310
311 let mut cert_cursor = std::io::BufReader::new(cert_pem.as_slice());
312 let mut key_cursor = std::io::BufReader::new(key_pem.as_slice());
313
314 let certs = rustls_pemfile::certs(&mut cert_cursor)
315 .filter_map(Result::ok)
316 .map(rustls::pki_types::CertificateDer::from)
317 .collect::<Vec<_>>();
318
319 let key = rustls_pemfile::private_key(&mut key_cursor)
320 .expect("failed to read private key")
321 .expect("no private key found");
322
323 let config = rustls::ServerConfig::builder()
324 .with_no_client_auth()
325 .with_single_cert(certs, key)
326 .expect("bad cert or key");
327
328 std::sync::Arc::new(config)
329 }
330 #[tokio::test]
341 async fn add_test_cert_to_roots_enables_tls_handshake() {
342 use http_body_util::Full;
343 use hyper::service::service_fn;
344 use hyper_util::rt::TokioIo;
345 use tokio::net::TcpListener;
346 use tokio_rustls::TlsAcceptor;
347 use tokio_rustls::rustls;
348
349 let _ = rustls::crypto::ring::default_provider().install_default();
350
351 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind failed");
352 let addr = listener.local_addr().expect("local_addr failed");
353
354 let tls_config = load_test_server_config();
355 let acceptor = TlsAcceptor::from(tls_config);
356
357 let ready = oneshot::channel::<()>();
358 let ready_tx = ready.0;
359 let ready_rx = ready.1;
360
361 let server_task = tokio::spawn(async move {
362 let _ = ready_tx.send(());
363
364 let accept_res = timeout(Duration::from_secs(3), listener.accept()).await;
365 let (socket, _) = accept_res
366 .expect("server accept timed out")
367 .expect("accept failed");
368
369 let tls_stream = timeout(Duration::from_secs(3), acceptor.accept(socket))
370 .await
371 .expect("tls accept timed out")
372 .expect("tls accept failed");
373
374 let io = TokioIo::new(tls_stream);
375
376 let svc = service_fn(|mut req: http::Request<hyper::body::Incoming>| async move {
377 use http_body_util::BodyExt;
378
379 while let Some(frame) = req.body_mut().frame().await {
380 if frame.is_err() {
381 break;
382 }
383 }
384
385 let mut resp = http::Response::new(Full::new(Bytes::from_static(b"ok")));
386 resp.headers_mut().insert(
387 http::header::CONNECTION,
388 http::HeaderValue::from_static("close"),
389 );
390 Ok::<_, hyper::Error>(resp)
391 });
392
393 timeout(
394 Duration::from_secs(3),
395 hyper::server::conn::http1::Builder::new()
396 .keep_alive(false)
397 .serve_connection(io, svc),
398 )
399 .await
400 .expect("serve_connection timed out")
401 .expect("serve_connection failed");
402 });
403
404 let _ = timeout(Duration::from_secs(1), ready_rx)
405 .await
406 .expect("server ready signal timed out")
407 .expect("server dropped before ready");
408
409 let mut roots = rustls::RootCertStore::empty();
411 add_test_cert_to_roots(&mut roots);
412
413 let client_config = rustls::ClientConfig::builder()
414 .with_root_certificates(roots)
415 .with_no_client_auth();
416
417 let https = hyper_rustls::HttpsConnectorBuilder::new()
419 .with_tls_config(client_config)
420 .https_only()
421 .enable_http1()
422 .build();
423
424 let client =
425 hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
426 .build(https);
427
428 let uri: http::Uri = format!("https://127.0.0.1:{}/", addr.port())
429 .parse()
430 .expect("bad uri");
431
432 let req = http::Request::builder()
433 .method("GET")
434 .uri(uri)
435 .body(Full::<Bytes>::new(Bytes::new()))
436 .expect("request build failed");
437
438 let res = timeout(Duration::from_secs(3), client.request(req))
439 .await
440 .expect("client request timed out")
441 .expect("TLS handshake or request failed");
442
443 assert!(res.status().is_success());
444
445 timeout(Duration::from_secs(3), server_task)
446 .await
447 .expect("server task timed out")
448 .expect("server task failed");
449 }
450
451 #[tokio::test]
457 async fn rejects_non_http_schemes() {
458 let uri: http::Uri = "ftp://example.com:1234".parse().unwrap();
459 let res = GrpcIndexer::new(uri).get_client().await;
460
461 assert!(
462 res.is_err(),
463 "expected get_client() to reject non-http(s) schemes, but got Ok"
464 );
465 }
466
467 #[tokio::test]
473 async fn https_connector_must_not_downgrade_to_http1() {
474 use http_body_util::Full;
475
476 let _ = rustls::crypto::ring::default_provider().install_default();
477
478 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind failed");
479 let addr = listener.local_addr().expect("local_addr failed");
480
481 let tls_config = load_test_server_config();
482 let acceptor = TlsAcceptor::from(tls_config);
483
484 let server_task = tokio::spawn(async move {
485 let accept_res = timeout(Duration::from_secs(3), listener.accept()).await;
486 let (socket, _) = accept_res
487 .expect("server accept timed out")
488 .expect("accept failed");
489
490 let tls_stream = acceptor.accept(socket).await.expect("tls accept failed");
491 let io = TokioIo::new(tls_stream);
492
493 let svc = service_fn(|_req: Request<Incoming>| async move {
494 Ok::<_, hyper::Error>(Response::new(Full::new(Bytes::from_static(b"ok"))))
495 });
496
497 let _ = hyper::server::conn::http1::Builder::new()
500 .serve_connection(io, svc)
501 .await;
502 });
503
504 let base = format!("https://127.0.0.1:{}", addr.port());
505 let uri = base.parse::<http::Uri>().expect("bad base uri");
506
507 let endpoint = tonic::transport::Endpoint::from_shared(uri.to_string())
508 .expect("endpoint")
509 .tcp_nodelay(true);
510
511 let tls = client_tls_config().expect("tls config");
512 let connect_res = endpoint
513 .tls_config(tls)
514 .expect("tls_config failed")
515 .connect()
516 .await;
517
518 assert!(
520 connect_res.is_err(),
521 "expected connect to fail (no downgrade to HTTP/1.1), but it succeeded"
522 );
523
524 server_task.abort();
525 }
526
527 #[tokio::test]
528 async fn connects_to_public_mainnet_indexer_and_gets_info() {
529 use std::time::Duration;
530 use tokio::time::timeout;
531 use tonic::Request;
532 use zcash_client_backend::proto::service::Empty;
533
534 let _ = rustls::crypto::ring::default_provider().install_default();
535
536 let endpoint = "https://zec.rocks:443".to_string();
537
538 let uri: http::Uri = endpoint.parse().expect("bad mainnet indexer URI");
539
540 let mut client = timeout(Duration::from_secs(10), GrpcIndexer::new(uri).get_client())
541 .await
542 .expect("timed out connecting to public indexer")
543 .expect("failed to connect to public indexer");
544
545 let response = timeout(
546 Duration::from_secs(10),
547 client.get_lightd_info(Request::new(Empty {})),
548 )
549 .await
550 .expect("timed out calling GetLightdInfo")
551 .expect("GetLightdInfo RPC failed")
552 .into_inner();
553
554 assert!(
555 !response.chain_name.is_empty(),
556 "chain_name should not be empty"
557 );
558 assert!(
559 response.block_height > 0,
560 "block_height should be > 0, got {}",
561 response.block_height
562 );
563
564 let chain = response.chain_name.to_ascii_lowercase();
565 assert!(
566 chain.contains("main"),
567 "expected a mainnet server, got chain_name={:?}",
568 response.chain_name
569 );
570 }
571}