1use std::future::Future;
42use std::time::Duration;
43
44use tonic::Request;
45use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
46
47pub use lightwallet_protocol;
48
49use lightwallet_protocol::{
50 BlockId, BlockRange, ChainSpec, CompactBlock, CompactTx, CompactTxStreamerClient, Empty,
51 GetMempoolTxRequest, GetSubtreeRootsArg, LightdInfo, RawTransaction, SubtreeRoot, TreeState,
52 TxFilter,
53};
54
55#[cfg(feature = "ping-very-insecure")]
56use lightwallet_protocol::{Duration as ProtoDuration, PingResponse};
57
58pub mod error;
59pub use error::*;
60
61#[cfg(feature = "globally-public-transparent")]
62mod globally_public;
63#[cfg(feature = "globally-public-transparent")]
64pub use globally_public::TransparentIndexer;
65
66fn client_tls_config() -> ClientTlsConfig {
67 #[cfg(test)]
69 {
70 ClientTlsConfig::new()
71 .ca_certificate(tonic::transport::Certificate::from_pem(
72 std::fs::read("test-data/localhost.pem").expect("test file"),
73 ))
74 .with_webpki_roots()
75 }
76 #[cfg(not(test))]
77 ClientTlsConfig::new().with_webpki_roots()
78}
79
80const DEFAULT_GRPC_TIMEOUT: Duration = Duration::from_secs(10);
81
82pub trait Indexer {
93 type GetInfoError: std::error::Error;
94 type GetLatestBlockError: std::error::Error;
95 type SendTransactionError: std::error::Error;
96 type GetTreeStateError: std::error::Error;
97 type GetBlockError: std::error::Error;
98 type GetBlockNullifiersError: std::error::Error;
99 type GetBlockRangeError: std::error::Error;
100 type GetBlockRangeNullifiersError: std::error::Error;
101 type GetTransactionError: std::error::Error;
102 type GetMempoolTxError: std::error::Error;
103 type GetMempoolStreamError: std::error::Error;
104 type GetLatestTreeStateError: std::error::Error;
105 type GetSubtreeRootsError: std::error::Error;
106
107 #[cfg(feature = "ping-very-insecure")]
108 type PingError: std::error::Error;
109
110 fn get_info(&self) -> impl Future<Output = Result<LightdInfo, Self::GetInfoError>>;
116
117 fn get_latest_block(&self) -> impl Future<Output = Result<BlockId, Self::GetLatestBlockError>>;
122
123 fn send_transaction(
130 &self,
131 tx_bytes: Box<[u8]>,
132 ) -> impl Future<Output = Result<String, Self::SendTransactionError>>;
133
134 fn get_tree_state(
140 &self,
141 block_id: BlockId,
142 ) -> impl Future<Output = Result<TreeState, Self::GetTreeStateError>>;
143
144 fn get_block(
149 &self,
150 block_id: BlockId,
151 ) -> impl Future<Output = Result<CompactBlock, Self::GetBlockError>>;
152
153 #[deprecated(note = "use get_block instead")]
158 fn get_block_nullifiers(
159 &self,
160 block_id: BlockId,
161 ) -> impl Future<Output = Result<CompactBlock, Self::GetBlockNullifiersError>>;
162
163 fn get_block_range(
173 &self,
174 range: BlockRange,
175 ) -> impl Future<Output = Result<tonic::Streaming<CompactBlock>, Self::GetBlockRangeError>>;
176
177 #[deprecated(note = "use get_block_range instead")]
183 fn get_block_range_nullifiers(
184 &self,
185 range: BlockRange,
186 ) -> impl Future<Output = Result<tonic::Streaming<CompactBlock>, Self::GetBlockRangeNullifiersError>>;
187
188 fn get_transaction(
194 &self,
195 filter: TxFilter,
196 ) -> impl Future<Output = Result<RawTransaction, Self::GetTransactionError>>;
197
198 fn get_mempool_tx(
204 &self,
205 request: GetMempoolTxRequest,
206 ) -> impl Future<Output = Result<tonic::Streaming<CompactTx>, Self::GetMempoolTxError>>;
207
208 fn get_mempool_stream(
213 &self,
214 ) -> impl Future<Output = Result<tonic::Streaming<RawTransaction>, Self::GetMempoolStreamError>>;
215
216 fn get_latest_tree_state(
221 &self,
222 ) -> impl Future<Output = Result<TreeState, Self::GetLatestTreeStateError>>;
223
224 fn get_subtree_roots(
229 &self,
230 arg: GetSubtreeRootsArg,
231 ) -> impl Future<Output = Result<tonic::Streaming<SubtreeRoot>, Self::GetSubtreeRootsError>>;
232
233 #[cfg(feature = "ping-very-insecure")]
240 fn ping(
241 &self,
242 duration: ProtoDuration,
243 ) -> impl Future<Output = Result<PingResponse, Self::PingError>>;
244}
245
246#[derive(Clone)]
248pub struct GrpcIndexer {
249 uri: http::Uri,
250 scheme: String,
251 authority: http::uri::Authority,
252 endpoint: Endpoint,
253}
254
255impl std::fmt::Debug for GrpcIndexer {
256 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
257 f.debug_struct("GrpcIndexer")
258 .field("scheme", &self.scheme)
259 .field("authority", &self.authority)
260 .finish_non_exhaustive()
261 }
262}
263
264impl GrpcIndexer {
265 pub fn new(uri: http::Uri) -> Result<Self, GetClientError> {
266 let scheme = uri
267 .scheme_str()
268 .ok_or(GetClientError::InvalidScheme)?
269 .to_string();
270 if scheme != "http" && scheme != "https" {
271 return Err(GetClientError::InvalidScheme);
272 }
273 let authority = uri
274 .authority()
275 .ok_or(GetClientError::InvalidAuthority)?
276 .clone();
277
278 let endpoint = Endpoint::from_shared(uri.to_string())?.tcp_nodelay(true);
279 let endpoint = if scheme == "https" {
280 endpoint.tls_config(client_tls_config())?
281 } else {
282 endpoint
283 };
284
285 Ok(Self {
286 uri,
287 scheme,
288 authority,
289 endpoint,
290 })
291 }
292
293 pub fn uri(&self) -> &http::Uri {
294 &self.uri
295 }
296
297 pub async fn get_client(&self) -> Result<CompactTxStreamerClient<Channel>, GetClientError> {
299 let channel = self.endpoint.connect().await?;
300 Ok(CompactTxStreamerClient::new(channel))
301 }
302
303 async fn time_boxed_call<T>(
304 &self,
305 payload: T,
306 ) -> Result<(CompactTxStreamerClient<Channel>, Request<T>), GetClientError> {
307 let client = self.get_client().await?;
308 let mut request = Request::new(payload);
309 request.set_timeout(DEFAULT_GRPC_TIMEOUT);
310 Ok((client, request))
311 }
312
313 async fn stream_call<T>(
314 &self,
315 payload: T,
316 ) -> Result<(CompactTxStreamerClient<Channel>, Request<T>), GetClientError> {
317 let client = self.get_client().await?;
318 Ok((client, Request::new(payload)))
319 }
320}
321
322#[cfg(feature = "back_compatible")]
323impl GrpcIndexer {
324 pub async fn get_zcb_client(
328 &self,
329 ) -> Result<
330 zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient<
331 Channel,
332 >,
333 GetClientError,
334 > {
335 let channel = self.endpoint.connect().await?;
336 Ok(
337 zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient::new(channel),
338 )
339 }
340}
341
342impl Indexer for GrpcIndexer {
343 type GetInfoError = GetInfoError;
344 type GetLatestBlockError = GetLatestBlockError;
345 type SendTransactionError = SendTransactionError;
346 type GetTreeStateError = GetTreeStateError;
347 type GetBlockError = GetBlockError;
348 type GetBlockNullifiersError = GetBlockNullifiersError;
349 type GetBlockRangeError = GetBlockRangeError;
350 type GetBlockRangeNullifiersError = GetBlockRangeNullifiersError;
351 type GetTransactionError = GetTransactionError;
352 type GetMempoolTxError = GetMempoolTxError;
353 type GetMempoolStreamError = GetMempoolStreamError;
354 type GetLatestTreeStateError = GetLatestTreeStateError;
355 type GetSubtreeRootsError = GetSubtreeRootsError;
356 #[cfg(feature = "ping-very-insecure")]
357 type PingError = PingError;
358
359 async fn get_info(&self) -> Result<LightdInfo, GetInfoError> {
360 let (mut client, request) = self.time_boxed_call(Empty {}).await?;
361 Ok(client.get_lightd_info(request).await?.into_inner())
362 }
363
364 async fn get_latest_block(&self) -> Result<BlockId, GetLatestBlockError> {
365 let (mut client, request) = self.time_boxed_call(ChainSpec {}).await?;
366 Ok(client.get_latest_block(request).await?.into_inner())
367 }
368
369 async fn send_transaction(&self, tx_bytes: Box<[u8]>) -> Result<String, SendTransactionError> {
370 let (mut client, request) = self
371 .time_boxed_call(RawTransaction {
372 data: tx_bytes.to_vec(),
373 height: 0,
374 })
375 .await?;
376 let sendresponse = client.send_transaction(request).await?.into_inner();
377 if sendresponse.error_code == 0 {
378 let mut transaction_id = sendresponse.error_message;
379 if transaction_id.starts_with('\"') && transaction_id.ends_with('\"') {
380 transaction_id = transaction_id[1..transaction_id.len() - 1].to_string();
381 }
382 Ok(transaction_id)
383 } else {
384 Err(SendTransactionError::SendRejected(format!(
385 "{sendresponse:?}"
386 )))
387 }
388 }
389
390 async fn get_tree_state(&self, block_id: BlockId) -> Result<TreeState, GetTreeStateError> {
391 let (mut client, request) = self.time_boxed_call(block_id).await?;
392 Ok(client.get_tree_state(request).await?.into_inner())
393 }
394
395 async fn get_block(&self, block_id: BlockId) -> Result<CompactBlock, GetBlockError> {
396 let (mut client, request) = self.time_boxed_call(block_id).await?;
397 Ok(client.get_block(request).await?.into_inner())
398 }
399
400 #[allow(deprecated)]
401 async fn get_block_nullifiers(
402 &self,
403 block_id: BlockId,
404 ) -> Result<CompactBlock, GetBlockNullifiersError> {
405 let (mut client, request) = self.time_boxed_call(block_id).await?;
406 Ok(client.get_block_nullifiers(request).await?.into_inner())
407 }
408
409 async fn get_block_range(
410 &self,
411 range: BlockRange,
412 ) -> Result<tonic::Streaming<CompactBlock>, GetBlockRangeError> {
413 let (mut client, request) = self.stream_call(range).await?;
414 Ok(client.get_block_range(request).await?.into_inner())
415 }
416
417 #[allow(deprecated)]
418 async fn get_block_range_nullifiers(
419 &self,
420 range: BlockRange,
421 ) -> Result<tonic::Streaming<CompactBlock>, GetBlockRangeNullifiersError> {
422 let (mut client, request) = self.stream_call(range).await?;
423 Ok(client
424 .get_block_range_nullifiers(request)
425 .await?
426 .into_inner())
427 }
428
429 async fn get_transaction(
430 &self,
431 filter: TxFilter,
432 ) -> Result<RawTransaction, GetTransactionError> {
433 let (mut client, request) = self.time_boxed_call(filter).await?;
434 Ok(client.get_transaction(request).await?.into_inner())
435 }
436
437 async fn get_mempool_tx(
438 &self,
439 request: GetMempoolTxRequest,
440 ) -> Result<tonic::Streaming<CompactTx>, GetMempoolTxError> {
441 let (mut client, request) = self.stream_call(request).await?;
442 Ok(client.get_mempool_tx(request).await?.into_inner())
443 }
444
445 async fn get_mempool_stream(
446 &self,
447 ) -> Result<tonic::Streaming<RawTransaction>, GetMempoolStreamError> {
448 let (mut client, request) = self.stream_call(Empty {}).await?;
449 Ok(client.get_mempool_stream(request).await?.into_inner())
450 }
451
452 async fn get_latest_tree_state(&self) -> Result<TreeState, GetLatestTreeStateError> {
453 let (mut client, request) = self.time_boxed_call(Empty {}).await?;
454 Ok(client.get_latest_tree_state(request).await?.into_inner())
455 }
456
457 async fn get_subtree_roots(
458 &self,
459 arg: GetSubtreeRootsArg,
460 ) -> Result<tonic::Streaming<SubtreeRoot>, GetSubtreeRootsError> {
461 let (mut client, request) = self.stream_call(arg).await?;
462 Ok(client.get_subtree_roots(request).await?.into_inner())
463 }
464
465 #[cfg(feature = "ping-very-insecure")]
466 async fn ping(&self, duration: ProtoDuration) -> Result<PingResponse, PingError> {
467 let (mut client, request) = self.time_boxed_call(duration).await?;
468 Ok(client.ping(request).await?.into_inner())
469 }
470}
471
472#[cfg(test)]
473mod proto_agreement;
474
475#[cfg(test)]
476mod tests {
477 use std::time::Duration;
492
493 use http::{Request, Response};
494 use hyper::{
495 body::{Bytes, Incoming},
496 service::service_fn,
497 };
498 use hyper_util::rt::TokioIo;
499 use tokio::{net::TcpListener, sync::oneshot, time::timeout};
500 use tokio_rustls::{TlsAcceptor, rustls};
501
502 use super::*;
503
504 use tokio_rustls::rustls::RootCertStore;
505
506 fn add_test_cert_to_roots(roots: &mut RootCertStore) {
507 use tonic::transport::CertificateDer;
508 eprintln!("Adding test cert to roots");
509
510 const TEST_PEMFILE_PATH: &str = "test-data/localhost.pem";
511
512 let Ok(fd) = std::fs::File::open(TEST_PEMFILE_PATH) else {
513 eprintln!("Test TLS cert not found at {TEST_PEMFILE_PATH}, skipping");
514 return;
515 };
516
517 let mut buf = std::io::BufReader::new(fd);
518 let certs_bytes: Vec<tonic::transport::CertificateDer> = rustls_pemfile::certs(&mut buf)
519 .filter_map(Result::ok)
520 .collect();
521
522 let certs: Vec<CertificateDer<'_>> = certs_bytes.into_iter().collect();
523 roots.add_parsable_certificates(certs);
524 }
525
526 #[test]
533 fn localhost_cert_file_exists_and_is_parseable() {
534 const CERT_PATH: &str = "test-data/localhost.pem";
535
536 let pem = std::fs::read(CERT_PATH).expect("missing test-data/localhost.pem");
537
538 let mut cursor = std::io::BufReader::new(pem.as_slice());
539 let certs = rustls_pemfile::certs(&mut cursor)
540 .filter_map(Result::ok)
541 .collect::<Vec<_>>();
542
543 assert!(!certs.is_empty(), "no certs found in {CERT_PATH}");
544
545 for cert in certs {
546 let der = cert.as_ref();
547 let parsed = x509_parser::parse_x509_certificate(der);
548 assert!(
549 parsed.is_ok(),
550 "failed to parse a cert from {CERT_PATH} as X.509"
551 );
552 }
553 }
554
555 #[test]
561 fn localhost_cert_is_end_entity_not_ca() {
562 let pem =
563 std::fs::read("test-data/localhost.pem").expect("missing test-data/localhost.pem");
564 let mut cursor = std::io::BufReader::new(pem.as_slice());
565
566 let certs = rustls_pemfile::certs(&mut cursor)
567 .filter_map(Result::ok)
568 .collect::<Vec<_>>();
569
570 assert!(!certs.is_empty(), "no certs found in localhost.pem");
571
572 let der = certs[0].as_ref();
573 let parsed = x509_parser::parse_x509_certificate(der).expect("failed to parse X.509");
574 let x509 = parsed.1;
575
576 let constraints = x509
577 .basic_constraints()
578 .expect("missing basic constraints extension");
579
580 assert!(
581 !constraints.unwrap().value.ca,
582 "localhost.pem must be CA:FALSE"
583 );
584 }
585
586 fn load_test_server_config() -> std::sync::Arc<rustls::ServerConfig> {
593 let cert_pem =
594 std::fs::read("test-data/localhost.pem").expect("missing test-data/localhost.pem");
595 let key_pem =
596 std::fs::read("test-data/localhost.key").expect("missing test-data/localhost.key");
597
598 let mut cert_cursor = std::io::BufReader::new(cert_pem.as_slice());
599 let mut key_cursor = std::io::BufReader::new(key_pem.as_slice());
600
601 let certs = rustls_pemfile::certs(&mut cert_cursor)
602 .filter_map(Result::ok)
603 .collect::<Vec<_>>();
604
605 let key = rustls_pemfile::private_key(&mut key_cursor)
606 .expect("failed to read private key")
607 .expect("no private key found");
608
609 let config = rustls::ServerConfig::builder()
610 .with_no_client_auth()
611 .with_single_cert(certs, key)
612 .expect("bad cert or key");
613
614 std::sync::Arc::new(config)
615 }
616 #[tokio::test]
627 async fn add_test_cert_to_roots_enables_tls_handshake() {
628 use http_body_util::Full;
629 use hyper::service::service_fn;
630 use hyper_util::rt::TokioIo;
631 use tokio::net::TcpListener;
632 use tokio_rustls::TlsAcceptor;
633 use tokio_rustls::rustls;
634
635 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind failed");
636 let addr = listener.local_addr().expect("local_addr failed");
637
638 let tls_config = load_test_server_config();
639 let acceptor = TlsAcceptor::from(tls_config);
640
641 let ready = oneshot::channel::<()>();
642 let ready_tx = ready.0;
643 let ready_rx = ready.1;
644
645 let server_task = tokio::spawn(async move {
646 let _ = ready_tx.send(());
647
648 let accept_res = timeout(Duration::from_secs(3), listener.accept()).await;
649 let (socket, _) = accept_res
650 .expect("server accept timed out")
651 .expect("accept failed");
652
653 let tls_stream = timeout(Duration::from_secs(3), acceptor.accept(socket))
654 .await
655 .expect("tls accept timed out")
656 .expect("tls accept failed");
657
658 let io = TokioIo::new(tls_stream);
659
660 let svc = service_fn(|mut req: http::Request<hyper::body::Incoming>| async move {
661 use http_body_util::BodyExt;
662
663 while let Some(frame) = req.body_mut().frame().await {
664 if frame.is_err() {
665 break;
666 }
667 }
668
669 let mut resp = http::Response::new(Full::new(Bytes::from_static(b"ok")));
670 resp.headers_mut().insert(
671 http::header::CONNECTION,
672 http::HeaderValue::from_static("close"),
673 );
674 Ok::<_, hyper::Error>(resp)
675 });
676
677 timeout(
678 Duration::from_secs(3),
679 hyper::server::conn::http1::Builder::new()
680 .keep_alive(false)
681 .serve_connection(io, svc),
682 )
683 .await
684 .expect("serve_connection timed out")
685 .expect("serve_connection failed");
686 });
687
688 timeout(Duration::from_secs(1), ready_rx)
689 .await
690 .expect("server ready signal timed out")
691 .expect("server dropped before ready");
692
693 let mut roots = rustls::RootCertStore::empty();
695 add_test_cert_to_roots(&mut roots);
696
697 let client_config = rustls::ClientConfig::builder()
698 .with_root_certificates(roots)
699 .with_no_client_auth();
700
701 let https = hyper_rustls::HttpsConnectorBuilder::new()
703 .with_tls_config(client_config)
704 .https_only()
705 .enable_http1()
706 .build();
707
708 let client =
709 hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
710 .build(https);
711
712 let uri: http::Uri = format!("https://127.0.0.1:{}/", addr.port())
713 .parse()
714 .expect("bad uri");
715
716 let req = http::Request::builder()
717 .method("GET")
718 .uri(uri)
719 .body(Full::<Bytes>::new(Bytes::new()))
720 .expect("request build failed");
721
722 let res = timeout(Duration::from_secs(3), client.request(req))
723 .await
724 .expect("client request timed out")
725 .expect("TLS handshake or request failed");
726
727 assert!(res.status().is_success());
728
729 timeout(Duration::from_secs(3), server_task)
730 .await
731 .expect("server task timed out")
732 .expect("server task failed");
733 }
734
735 #[test]
740 fn rejects_non_http_schemes() {
741 let uri: http::Uri = "ftp://example.com:1234".parse().unwrap();
742 let res = GrpcIndexer::new(uri);
743
744 assert!(
745 res.is_err(),
746 "expected GrpcIndexer::new() to reject non-http(s) schemes, but got Ok"
747 );
748 }
749
750 #[tokio::test]
756 async fn https_connector_must_not_downgrade_to_http1() {
757 use http_body_util::Full;
758
759 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind failed");
760 let addr = listener.local_addr().expect("local_addr failed");
761
762 let tls_config = load_test_server_config();
763 let acceptor = TlsAcceptor::from(tls_config);
764
765 let server_task = tokio::spawn(async move {
766 let accept_res = timeout(Duration::from_secs(3), listener.accept()).await;
767 let (socket, _) = accept_res
768 .expect("server accept timed out")
769 .expect("accept failed");
770
771 let tls_stream = acceptor.accept(socket).await.expect("tls accept failed");
772 let io = TokioIo::new(tls_stream);
773
774 let svc = service_fn(|_req: Request<Incoming>| async move {
775 Ok::<_, hyper::Error>(Response::new(Full::new(Bytes::from_static(b"ok"))))
776 });
777
778 let _ = hyper::server::conn::http1::Builder::new()
781 .serve_connection(io, svc)
782 .await;
783 });
784
785 let base = format!("https://127.0.0.1:{}", addr.port());
786 let uri = base.parse::<http::Uri>().expect("bad base uri");
787
788 let endpoint = tonic::transport::Endpoint::from_shared(uri.to_string())
789 .expect("endpoint")
790 .tcp_nodelay(true);
791
792 let connect_res = endpoint
793 .tls_config(client_tls_config())
794 .expect("tls_config failed")
795 .connect()
796 .await;
797
798 assert!(
800 connect_res.is_err(),
801 "expected connect to fail (no downgrade to HTTP/1.1), but it succeeded"
802 );
803
804 server_task.abort();
805 }
806
807 #[tokio::test]
808 async fn connects_to_public_mainnet_indexer_and_gets_info() {
809 let endpoint = "https://zec.rocks:443".to_string();
810
811 let uri: http::Uri = endpoint.parse().expect("bad mainnet indexer URI");
812
813 let response = GrpcIndexer::new(uri)
814 .expect("URI to be valid.")
815 .get_info()
816 .await
817 .expect("to get info");
818 assert!(
819 !response.chain_name.is_empty(),
820 "chain_name should not be empty"
821 );
822 assert!(
823 response.block_height > 0,
824 "block_height should be > 0, got {}",
825 response.block_height
826 );
827
828 let chain = response.chain_name.to_ascii_lowercase();
829 assert!(
830 chain.contains("main"),
831 "expected a mainnet server, got chain_name={:?}",
832 response.chain_name
833 );
834 }
835
836 #[tokio::test]
844 async fn get_block_range_supports_descending_order() {
845 use tokio_stream::StreamExt;
846
847 let uri: http::Uri = "https://zec.rocks:443".parse().unwrap();
848 let indexer = GrpcIndexer::new(uri).expect("valid URI");
849
850 let tip = indexer.get_latest_block().await.expect("get_latest_block");
851 let start_height = tip.height;
852 let end_height = start_height.saturating_sub(4);
853
854 let range = BlockRange {
856 start: Some(BlockId {
857 height: start_height,
858 hash: vec![],
859 }),
860 end: Some(BlockId {
861 height: end_height,
862 hash: vec![],
863 }),
864 pool_types: vec![],
865 };
866
867 let mut stream = indexer
868 .get_block_range(range)
869 .await
870 .expect("get_block_range");
871
872 let mut heights = Vec::new();
873 while let Some(block) = stream.next().await {
874 let block = block.expect("stream item");
875 heights.push(block.height);
876 }
877
878 assert!(
879 !heights.is_empty(),
880 "expected at least one block in the descending range",
881 );
882
883 for window in heights.windows(2) {
886 assert!(
887 window[0] > window[1],
888 "expected descending order, but got heights: {heights:?}",
889 );
890 }
891 }
892}