1use std::future::Future;
32use std::time::Duration;
33
34use tonic::Request;
35use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
36
37use lightwallet_protocol::{
38 BlockId, BlockRange, ChainSpec, CompactBlock, CompactTx, CompactTxStreamerClient, Empty,
39 GetMempoolTxRequest, GetSubtreeRootsArg, LightdInfo, RawTransaction, SubtreeRoot, TreeState,
40 TxFilter,
41};
42
43#[cfg(feature = "ping-very-insecure")]
44use lightwallet_protocol::{Duration as ProtoDuration, PingResponse};
45
46pub mod error;
47
48pub use error::*;
49pub use lightwallet_protocol;
50pub use tonic::{Status, Streaming};
51
52#[cfg(feature = "globally-public-transparent")]
53mod globally_public;
54#[cfg(feature = "globally-public-transparent")]
55pub use globally_public::TransparentIndexer;
56
57fn client_tls_config() -> ClientTlsConfig {
58 #[cfg(test)]
60 {
61 ClientTlsConfig::new()
62 .ca_certificate(tonic::transport::Certificate::from_pem(
63 std::fs::read("test-data/localhost.pem").expect("test file"),
64 ))
65 .with_webpki_roots()
66 }
67 #[cfg(not(test))]
68 ClientTlsConfig::new().with_webpki_roots()
69}
70
71pub trait Indexer {
79 fn get_lightd_info(
85 &mut self,
86 timeout: Duration,
87 ) -> impl Future<Output = Result<LightdInfo, tonic::Status>> + Send;
88
89 fn get_latest_block(
94 &mut self,
95 timeout: Duration,
96 ) -> impl Future<Output = Result<BlockId, tonic::Status>> + Send;
97
98 fn send_transaction(
105 &mut self,
106 tx: RawTransaction,
107 timeout: Duration,
108 ) -> impl Future<Output = Result<String, tonic::Status>> + Send;
109
110 fn get_tree_state(
116 &mut self,
117 block_id: BlockId,
118 timeout: Duration,
119 ) -> impl Future<Output = Result<TreeState, tonic::Status>> + Send;
120
121 fn get_block(
126 &mut self,
127 block_id: BlockId,
128 timeout: Duration,
129 ) -> impl Future<Output = Result<CompactBlock, tonic::Status>> + Send;
130
131 #[deprecated(note = "use get_block instead")]
136 fn get_block_nullifiers(
137 &mut self,
138 block_id: BlockId,
139 timeout: Duration,
140 ) -> impl Future<Output = Result<CompactBlock, tonic::Status>> + Send;
141
142 fn get_block_range(
152 &mut self,
153 range: BlockRange,
154 timeout: Duration,
155 ) -> impl Future<Output = Result<tonic::Streaming<CompactBlock>, tonic::Status>> + Send;
156
157 #[deprecated(note = "use get_block_range instead")]
163 fn get_block_range_nullifiers(
164 &mut self,
165 range: BlockRange,
166 timeout: Duration,
167 ) -> impl Future<Output = Result<tonic::Streaming<CompactBlock>, tonic::Status>> + Send;
168
169 fn get_transaction(
175 &mut self,
176 filter: TxFilter,
177 timeout: Duration,
178 ) -> impl Future<Output = Result<RawTransaction, tonic::Status>> + Send;
179
180 fn get_mempool_tx(
186 &mut self,
187 request: GetMempoolTxRequest,
188 timeout: Duration,
189 ) -> impl Future<Output = Result<tonic::Streaming<CompactTx>, tonic::Status>> + Send;
190
191 fn get_mempool_stream(
196 &mut self,
197 timeout: Duration,
198 ) -> impl Future<Output = Result<tonic::Streaming<RawTransaction>, tonic::Status>> + Send;
199
200 fn get_latest_tree_state(
205 &mut self,
206 timeout: Duration,
207 ) -> impl Future<Output = Result<TreeState, tonic::Status>> + Send;
208
209 fn get_subtree_roots(
214 &mut self,
215 arg: GetSubtreeRootsArg,
216 timeout: Duration,
217 ) -> impl Future<Output = Result<tonic::Streaming<SubtreeRoot>, tonic::Status>> + Send;
218
219 #[cfg(feature = "ping-very-insecure")]
226 fn ping(
227 &mut self,
228 duration: ProtoDuration,
229 timeout: Duration,
230 ) -> impl Future<Output = Result<PingResponse, tonic::Status>> + Send;
231}
232
233#[derive(Debug, Clone)]
235pub struct GrpcIndexer {
236 uri: http::Uri,
237 clear_net_client: CompactTxStreamerClient<Channel>,
238 }
240
241impl GrpcIndexer {
242 pub async fn new(uri: http::Uri) -> Result<Self, GetClientError> {
243 let scheme = uri
244 .scheme_str()
245 .ok_or(GetClientError::InvalidScheme)?
246 .to_string();
247 if scheme != "http" && scheme != "https" {
248 return Err(GetClientError::InvalidScheme);
249 }
250 let _authority = uri
251 .authority()
252 .ok_or(GetClientError::InvalidAuthority)?
253 .clone();
254
255 let endpoint = Endpoint::from_shared(uri.to_string())?.tcp_nodelay(true);
256 let endpoint = if scheme == "https" {
257 endpoint.tls_config(client_tls_config())?
258 } else {
259 endpoint
260 };
261 let channel = endpoint.connect().await?;
262 let clear_net_client = CompactTxStreamerClient::new(channel);
263
264 Ok(Self {
265 uri,
266 clear_net_client,
267 })
268 }
269
270 pub fn uri(&self) -> &http::Uri {
272 &self.uri
273 }
274
275 pub async fn get_clear_net_client(&self) -> CompactTxStreamerClient<Channel> {
277 self.clear_net_client.clone()
278 }
279}
280
281impl Indexer for GrpcIndexer {
282 async fn get_lightd_info(&mut self, timeout: Duration) -> Result<LightdInfo, tonic::Status> {
283 let mut request = Request::new(Empty {});
284 request.set_timeout(timeout);
285 Ok(self
286 .clear_net_client
287 .get_lightd_info(request)
288 .await?
289 .into_inner())
290 }
291
292 async fn get_latest_block(&mut self, timeout: Duration) -> Result<BlockId, tonic::Status> {
293 let mut request = Request::new(ChainSpec {});
294 request.set_timeout(timeout);
295 Ok(self
296 .clear_net_client
297 .get_latest_block(request)
298 .await?
299 .into_inner())
300 }
301
302 async fn send_transaction(
303 &mut self,
304 tx: RawTransaction,
305 timeout: Duration,
306 ) -> Result<String, tonic::Status> {
307 let mut request = Request::new(tx);
308 request.set_timeout(timeout);
309 let sendresponse = self
310 .clear_net_client
311 .send_transaction(request)
312 .await?
313 .into_inner();
314 if sendresponse.error_code == 0 {
315 let mut transaction_id = sendresponse.error_message;
316 if transaction_id.starts_with('\"') && transaction_id.ends_with('\"') {
317 transaction_id = transaction_id[1..transaction_id.len() - 1].to_string();
318 }
319 Ok(transaction_id)
320 } else {
321 Err(tonic::Status::new(
322 tonic::Code::Unknown,
323 sendresponse.error_message,
324 ))
325 }
326 }
327
328 async fn get_tree_state(
329 &mut self,
330 block_id: BlockId,
331 timeout: Duration,
332 ) -> Result<TreeState, tonic::Status> {
333 let mut request = Request::new(block_id);
334 request.set_timeout(timeout);
335 Ok(self
336 .clear_net_client
337 .get_tree_state(request)
338 .await?
339 .into_inner())
340 }
341
342 async fn get_block(
343 &mut self,
344 block_id: BlockId,
345 timeout: Duration,
346 ) -> Result<CompactBlock, tonic::Status> {
347 let mut request = Request::new(block_id);
348 request.set_timeout(timeout);
349 Ok(self.clear_net_client.get_block(request).await?.into_inner())
350 }
351
352 #[allow(deprecated)]
353 async fn get_block_nullifiers(
354 &mut self,
355 block_id: BlockId,
356 timeout: Duration,
357 ) -> Result<CompactBlock, tonic::Status> {
358 let mut request = Request::new(block_id);
359 request.set_timeout(timeout);
360 Ok(self
361 .clear_net_client
362 .get_block_nullifiers(request)
363 .await?
364 .into_inner())
365 }
366
367 async fn get_block_range(
368 &mut self,
369 range: BlockRange,
370 timeout: Duration,
371 ) -> Result<tonic::Streaming<CompactBlock>, tonic::Status> {
372 let mut request = Request::new(range);
373 request.set_timeout(timeout);
374 Ok(self
375 .clear_net_client
376 .get_block_range(request)
377 .await?
378 .into_inner())
379 }
380
381 #[allow(deprecated)]
382 async fn get_block_range_nullifiers(
383 &mut self,
384 range: BlockRange,
385 timeout: Duration,
386 ) -> Result<tonic::Streaming<CompactBlock>, tonic::Status> {
387 let mut request = Request::new(range);
388 request.set_timeout(timeout);
389 Ok(self
390 .clear_net_client
391 .get_block_range_nullifiers(request)
392 .await?
393 .into_inner())
394 }
395
396 async fn get_transaction(
397 &mut self,
398 filter: TxFilter,
399 timeout: Duration,
400 ) -> Result<RawTransaction, tonic::Status> {
401 let mut request = Request::new(filter);
402 request.set_timeout(timeout);
403 Ok(self
404 .clear_net_client
405 .get_transaction(request)
406 .await?
407 .into_inner())
408 }
409
410 async fn get_mempool_tx(
411 &mut self,
412 request: GetMempoolTxRequest,
413 timeout: Duration,
414 ) -> Result<tonic::Streaming<CompactTx>, tonic::Status> {
415 let mut request = Request::new(request);
416 request.set_timeout(timeout);
417 Ok(self
418 .clear_net_client
419 .get_mempool_tx(request)
420 .await?
421 .into_inner())
422 }
423
424 async fn get_mempool_stream(
425 &mut self,
426 timeout: Duration,
427 ) -> Result<tonic::Streaming<RawTransaction>, tonic::Status> {
428 let mut request = Request::new(Empty {});
429 request.set_timeout(timeout);
430 Ok(self
431 .clear_net_client
432 .get_mempool_stream(request)
433 .await?
434 .into_inner())
435 }
436
437 async fn get_latest_tree_state(
438 &mut self,
439 timeout: Duration,
440 ) -> Result<TreeState, tonic::Status> {
441 let mut request = Request::new(Empty {});
442 request.set_timeout(timeout);
443 Ok(self
444 .clear_net_client
445 .get_latest_tree_state(request)
446 .await?
447 .into_inner())
448 }
449
450 async fn get_subtree_roots(
451 &mut self,
452 arg: GetSubtreeRootsArg,
453 timeout: Duration,
454 ) -> Result<tonic::Streaming<SubtreeRoot>, tonic::Status> {
455 let mut request = Request::new(arg);
456 request.set_timeout(timeout);
457 Ok(self
458 .clear_net_client
459 .get_subtree_roots(request)
460 .await?
461 .into_inner())
462 }
463
464 #[cfg(feature = "ping-very-insecure")]
465 async fn ping(
466 &mut self,
467 duration: ProtoDuration,
468 timeout: Duration,
469 ) -> Result<PingResponse, tonic::Status> {
470 let mut request = Request::new(duration);
471 request.set_timeout(timeout);
472 Ok(self.clear_net_client.ping(request).await?.into_inner())
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use std::time::Duration;
493
494 use http::{Request, Response};
495 use hyper::{
496 body::{Bytes, Incoming},
497 service::service_fn,
498 };
499 use hyper_util::rt::TokioIo;
500 use tokio::{net::TcpListener, sync::oneshot, time::timeout};
501 use tokio_rustls::{TlsAcceptor, rustls};
502
503 use super::*;
504
505 use tokio_rustls::rustls::RootCertStore;
506
507 const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
508
509 fn add_test_cert_to_roots(roots: &mut RootCertStore) {
510 use tonic::transport::CertificateDer;
511 eprintln!("Adding test cert to roots");
512
513 const TEST_PEMFILE_PATH: &str = "test-data/localhost.pem";
514
515 let Ok(fd) = std::fs::File::open(TEST_PEMFILE_PATH) else {
516 eprintln!("Test TLS cert not found at {TEST_PEMFILE_PATH}, skipping");
517 return;
518 };
519
520 let mut buf = std::io::BufReader::new(fd);
521 let certs_bytes: Vec<tonic::transport::CertificateDer> = rustls_pemfile::certs(&mut buf)
522 .filter_map(Result::ok)
523 .collect();
524
525 let certs: Vec<CertificateDer<'_>> = certs_bytes.into_iter().collect();
526 roots.add_parsable_certificates(certs);
527 }
528
529 #[test]
536 fn localhost_cert_file_exists_and_is_parseable() {
537 const CERT_PATH: &str = "test-data/localhost.pem";
538
539 let pem = std::fs::read(CERT_PATH).expect("missing test-data/localhost.pem");
540
541 let mut cursor = std::io::BufReader::new(pem.as_slice());
542 let certs = rustls_pemfile::certs(&mut cursor)
543 .filter_map(Result::ok)
544 .collect::<Vec<_>>();
545
546 assert!(!certs.is_empty(), "no certs found in {CERT_PATH}");
547
548 for cert in certs {
549 let der = cert.as_ref();
550 let parsed = x509_parser::parse_x509_certificate(der);
551 assert!(
552 parsed.is_ok(),
553 "failed to parse a cert from {CERT_PATH} as X.509"
554 );
555 }
556 }
557
558 #[test]
564 fn localhost_cert_is_end_entity_not_ca() {
565 let pem =
566 std::fs::read("test-data/localhost.pem").expect("missing test-data/localhost.pem");
567 let mut cursor = std::io::BufReader::new(pem.as_slice());
568
569 let certs = rustls_pemfile::certs(&mut cursor)
570 .filter_map(Result::ok)
571 .collect::<Vec<_>>();
572
573 assert!(!certs.is_empty(), "no certs found in localhost.pem");
574
575 let der = certs[0].as_ref();
576 let parsed = x509_parser::parse_x509_certificate(der).expect("failed to parse X.509");
577 let x509 = parsed.1;
578
579 let constraints = x509
580 .basic_constraints()
581 .expect("missing basic constraints extension");
582
583 assert!(
584 !constraints.unwrap().value.ca,
585 "localhost.pem must be CA:FALSE"
586 );
587 }
588
589 fn load_test_server_config() -> std::sync::Arc<rustls::ServerConfig> {
596 let cert_pem =
597 std::fs::read("test-data/localhost.pem").expect("missing test-data/localhost.pem");
598 let key_pem =
599 std::fs::read("test-data/localhost.key").expect("missing test-data/localhost.key");
600
601 let mut cert_cursor = std::io::BufReader::new(cert_pem.as_slice());
602 let mut key_cursor = std::io::BufReader::new(key_pem.as_slice());
603
604 let certs = rustls_pemfile::certs(&mut cert_cursor)
605 .filter_map(Result::ok)
606 .collect::<Vec<_>>();
607
608 let key = rustls_pemfile::private_key(&mut key_cursor)
609 .expect("failed to read private key")
610 .expect("no private key found");
611
612 let config = rustls::ServerConfig::builder()
613 .with_no_client_auth()
614 .with_single_cert(certs, key)
615 .expect("bad cert or key");
616
617 std::sync::Arc::new(config)
618 }
619 #[tokio::test]
630 async fn add_test_cert_to_roots_enables_tls_handshake() {
631 use http_body_util::Full;
632 use hyper::service::service_fn;
633 use hyper_util::rt::TokioIo;
634 use tokio::net::TcpListener;
635 use tokio_rustls::TlsAcceptor;
636 use tokio_rustls::rustls;
637
638 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind failed");
639 let addr = listener.local_addr().expect("local_addr failed");
640
641 let tls_config = load_test_server_config();
642 let acceptor = TlsAcceptor::from(tls_config);
643
644 let ready = oneshot::channel::<()>();
645 let ready_tx = ready.0;
646 let ready_rx = ready.1;
647
648 let server_task = tokio::spawn(async move {
649 let _ = ready_tx.send(());
650
651 let accept_res = timeout(Duration::from_secs(3), listener.accept()).await;
652 let (socket, _) = accept_res
653 .expect("server accept timed out")
654 .expect("accept failed");
655
656 let tls_stream = timeout(Duration::from_secs(3), acceptor.accept(socket))
657 .await
658 .expect("tls accept timed out")
659 .expect("tls accept failed");
660
661 let io = TokioIo::new(tls_stream);
662
663 let svc = service_fn(|mut req: http::Request<hyper::body::Incoming>| async move {
664 use http_body_util::BodyExt;
665
666 while let Some(frame) = req.body_mut().frame().await {
667 if frame.is_err() {
668 break;
669 }
670 }
671
672 let mut resp = http::Response::new(Full::new(Bytes::from_static(b"ok")));
673 resp.headers_mut().insert(
674 http::header::CONNECTION,
675 http::HeaderValue::from_static("close"),
676 );
677 Ok::<_, hyper::Error>(resp)
678 });
679
680 timeout(
681 Duration::from_secs(3),
682 hyper::server::conn::http1::Builder::new()
683 .keep_alive(false)
684 .serve_connection(io, svc),
685 )
686 .await
687 .expect("serve_connection timed out")
688 .expect("serve_connection failed");
689 });
690
691 timeout(Duration::from_secs(1), ready_rx)
692 .await
693 .expect("server ready signal timed out")
694 .expect("server dropped before ready");
695
696 let mut roots = rustls::RootCertStore::empty();
698 add_test_cert_to_roots(&mut roots);
699
700 let client_config = rustls::ClientConfig::builder()
701 .with_root_certificates(roots)
702 .with_no_client_auth();
703
704 let https = hyper_rustls::HttpsConnectorBuilder::new()
706 .with_tls_config(client_config)
707 .https_only()
708 .enable_http1()
709 .build();
710
711 let client =
712 hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
713 .build(https);
714
715 let uri: http::Uri = format!("https://127.0.0.1:{}/", addr.port())
716 .parse()
717 .expect("bad uri");
718
719 let req = http::Request::builder()
720 .method("GET")
721 .uri(uri)
722 .body(Full::<Bytes>::new(Bytes::new()))
723 .expect("request build failed");
724
725 let res = timeout(Duration::from_secs(3), client.request(req))
726 .await
727 .expect("client request timed out")
728 .expect("TLS handshake or request failed");
729
730 assert!(res.status().is_success());
731
732 timeout(Duration::from_secs(3), server_task)
733 .await
734 .expect("server task timed out")
735 .expect("server task failed");
736 }
737
738 #[tokio::test]
743 async fn rejects_non_http_schemes() {
744 let uri: http::Uri = "ftp://example.com:1234".parse().unwrap();
745 let res = GrpcIndexer::new(uri).await;
746
747 assert!(
748 res.is_err(),
749 "expected GrpcIndexer::new() to reject non-http(s) schemes, but got Ok"
750 );
751 }
752
753 #[tokio::test]
759 async fn https_connector_must_not_downgrade_to_http1() {
760 use http_body_util::Full;
761
762 let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind failed");
763 let addr = listener.local_addr().expect("local_addr failed");
764
765 let tls_config = load_test_server_config();
766 let acceptor = TlsAcceptor::from(tls_config);
767
768 let server_task = tokio::spawn(async move {
769 let accept_res = timeout(Duration::from_secs(3), listener.accept()).await;
770 let (socket, _) = accept_res
771 .expect("server accept timed out")
772 .expect("accept failed");
773
774 let tls_stream = acceptor.accept(socket).await.expect("tls accept failed");
775 let io = TokioIo::new(tls_stream);
776
777 let svc = service_fn(|_req: Request<Incoming>| async move {
778 Ok::<_, hyper::Error>(Response::new(Full::new(Bytes::from_static(b"ok"))))
779 });
780
781 let _ = hyper::server::conn::http1::Builder::new()
784 .serve_connection(io, svc)
785 .await;
786 });
787
788 let base = format!("https://127.0.0.1:{}", addr.port());
789 let uri = base.parse::<http::Uri>().expect("bad base uri");
790
791 let endpoint = tonic::transport::Endpoint::from_shared(uri.to_string())
792 .expect("endpoint")
793 .tcp_nodelay(true);
794
795 let connect_res = endpoint
796 .tls_config(client_tls_config())
797 .expect("tls_config failed")
798 .connect()
799 .await;
800
801 assert!(
803 connect_res.is_err(),
804 "expected connect to fail (no downgrade to HTTP/1.1), but it succeeded"
805 );
806
807 server_task.abort();
808 }
809
810 #[tokio::test]
811 async fn connects_to_public_mainnet_indexer_and_gets_info() {
812 let endpoint = "https://zec.rocks:443".to_string();
813
814 let uri: http::Uri = endpoint.parse().expect("bad mainnet indexer URI");
815
816 let response = GrpcIndexer::new(uri)
817 .await
818 .expect("URI to be valid.")
819 .get_lightd_info(DEFAULT_TIMEOUT)
820 .await
821 .expect("to get info");
822 assert!(
823 !response.chain_name.is_empty(),
824 "chain_name should not be empty"
825 );
826 assert!(
827 response.block_height > 0,
828 "block_height should be > 0, got {}",
829 response.block_height
830 );
831
832 let chain = response.chain_name.to_ascii_lowercase();
833 assert!(
834 chain.contains("main"),
835 "expected a mainnet server, got chain_name={:?}",
836 response.chain_name
837 );
838 }
839
840 #[tokio::test]
848 async fn get_block_range_supports_descending_order() {
849 use tokio_stream::StreamExt;
850
851 let uri: http::Uri = "https://zec.rocks:443".parse().unwrap();
852 let mut indexer = GrpcIndexer::new(uri).await.expect("valid URI");
853
854 let tip = indexer
855 .get_latest_block(DEFAULT_TIMEOUT)
856 .await
857 .expect("get_latest_block");
858 let start_height = tip.height;
859 let end_height = start_height.saturating_sub(4);
860
861 let range = BlockRange {
863 start: Some(BlockId {
864 height: start_height,
865 hash: vec![],
866 }),
867 end: Some(BlockId {
868 height: end_height,
869 hash: vec![],
870 }),
871 pool_types: vec![],
872 };
873
874 let mut stream = indexer
875 .get_block_range(range, DEFAULT_TIMEOUT)
876 .await
877 .expect("get_block_range");
878
879 let mut heights = Vec::new();
880 while let Some(block) = stream.next().await {
881 let block = block.expect("stream item");
882 heights.push(block.height);
883 }
884
885 assert!(
886 !heights.is_empty(),
887 "expected at least one block in the descending range",
888 );
889
890 for window in heights.windows(2) {
893 assert!(
894 window[0] > window[1],
895 "expected descending order, but got heights: {heights:?}",
896 );
897 }
898 }
899}