use std::future::Future;
use std::time::Duration;
use tonic::Request;
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
use lightwallet_protocol::{
BlockId, BlockRange, ChainSpec, CompactBlock, CompactTx, CompactTxStreamerClient, Empty,
GetMempoolTxRequest, GetSubtreeRootsArg, LightdInfo, RawTransaction, SubtreeRoot, TreeState,
TxFilter,
};
#[cfg(feature = "ping-very-insecure")]
use lightwallet_protocol::{Duration as ProtoDuration, PingResponse};
pub mod error;
pub use error::*;
pub use lightwallet_protocol;
pub use tonic::{Status, Streaming};
#[cfg(feature = "globally-public-transparent")]
mod globally_public;
#[cfg(feature = "globally-public-transparent")]
pub use globally_public::TransparentIndexer;
fn client_tls_config() -> ClientTlsConfig {
#[cfg(test)]
{
ClientTlsConfig::new()
.ca_certificate(tonic::transport::Certificate::from_pem(
std::fs::read("test-data/localhost.pem").expect("test file"),
))
.with_webpki_roots()
}
#[cfg(not(test))]
ClientTlsConfig::new().with_webpki_roots()
}
pub trait Indexer {
fn get_lightd_info(
&mut self,
timeout: Duration,
) -> impl Future<Output = Result<LightdInfo, tonic::Status>> + Send;
fn get_latest_block(
&mut self,
timeout: Duration,
) -> impl Future<Output = Result<BlockId, tonic::Status>> + Send;
fn send_transaction(
&mut self,
tx: RawTransaction,
timeout: Duration,
) -> impl Future<Output = Result<String, tonic::Status>> + Send;
fn get_tree_state(
&mut self,
block_id: BlockId,
timeout: Duration,
) -> impl Future<Output = Result<TreeState, tonic::Status>> + Send;
fn get_block(
&mut self,
block_id: BlockId,
timeout: Duration,
) -> impl Future<Output = Result<CompactBlock, tonic::Status>> + Send;
#[deprecated(note = "use get_block instead")]
fn get_block_nullifiers(
&mut self,
block_id: BlockId,
timeout: Duration,
) -> impl Future<Output = Result<CompactBlock, tonic::Status>> + Send;
fn get_block_range(
&mut self,
range: BlockRange,
timeout: Duration,
) -> impl Future<Output = Result<tonic::Streaming<CompactBlock>, tonic::Status>> + Send;
#[deprecated(note = "use get_block_range instead")]
fn get_block_range_nullifiers(
&mut self,
range: BlockRange,
timeout: Duration,
) -> impl Future<Output = Result<tonic::Streaming<CompactBlock>, tonic::Status>> + Send;
fn get_transaction(
&mut self,
filter: TxFilter,
timeout: Duration,
) -> impl Future<Output = Result<RawTransaction, tonic::Status>> + Send;
fn get_mempool_tx(
&mut self,
request: GetMempoolTxRequest,
timeout: Duration,
) -> impl Future<Output = Result<tonic::Streaming<CompactTx>, tonic::Status>> + Send;
fn get_mempool_stream(
&mut self,
timeout: Duration,
) -> impl Future<Output = Result<tonic::Streaming<RawTransaction>, tonic::Status>> + Send;
fn get_latest_tree_state(
&mut self,
timeout: Duration,
) -> impl Future<Output = Result<TreeState, tonic::Status>> + Send;
fn get_subtree_roots(
&mut self,
arg: GetSubtreeRootsArg,
timeout: Duration,
) -> impl Future<Output = Result<tonic::Streaming<SubtreeRoot>, tonic::Status>> + Send;
#[cfg(feature = "ping-very-insecure")]
fn ping(
&mut self,
duration: ProtoDuration,
timeout: Duration,
) -> impl Future<Output = Result<PingResponse, tonic::Status>> + Send;
}
#[derive(Debug, Clone)]
pub struct GrpcIndexer {
uri: http::Uri,
clear_net_client: CompactTxStreamerClient<Channel>,
}
impl GrpcIndexer {
pub async fn new(uri: http::Uri) -> Result<Self, GetClientError> {
let scheme = uri
.scheme_str()
.ok_or(GetClientError::InvalidScheme)?
.to_string();
if scheme != "http" && scheme != "https" {
return Err(GetClientError::InvalidScheme);
}
let _authority = uri
.authority()
.ok_or(GetClientError::InvalidAuthority)?
.clone();
let endpoint = Endpoint::from_shared(uri.to_string())?.tcp_nodelay(true);
let endpoint = if scheme == "https" {
endpoint.tls_config(client_tls_config())?
} else {
endpoint
};
let channel = endpoint.connect().await?;
let clear_net_client = CompactTxStreamerClient::new(channel);
Ok(Self {
uri,
clear_net_client,
})
}
pub fn uri(&self) -> &http::Uri {
&self.uri
}
pub async fn get_clear_net_client(&self) -> CompactTxStreamerClient<Channel> {
self.clear_net_client.clone()
}
}
impl Indexer for GrpcIndexer {
async fn get_lightd_info(&mut self, timeout: Duration) -> Result<LightdInfo, tonic::Status> {
let mut request = Request::new(Empty {});
request.set_timeout(timeout);
Ok(self
.clear_net_client
.get_lightd_info(request)
.await?
.into_inner())
}
async fn get_latest_block(&mut self, timeout: Duration) -> Result<BlockId, tonic::Status> {
let mut request = Request::new(ChainSpec {});
request.set_timeout(timeout);
Ok(self
.clear_net_client
.get_latest_block(request)
.await?
.into_inner())
}
async fn send_transaction(
&mut self,
tx: RawTransaction,
timeout: Duration,
) -> Result<String, tonic::Status> {
let mut request = Request::new(tx);
request.set_timeout(timeout);
let sendresponse = self
.clear_net_client
.send_transaction(request)
.await?
.into_inner();
if sendresponse.error_code == 0 {
let mut transaction_id = sendresponse.error_message;
if transaction_id.starts_with('\"') && transaction_id.ends_with('\"') {
transaction_id = transaction_id[1..transaction_id.len() - 1].to_string();
}
Ok(transaction_id)
} else {
Err(tonic::Status::new(
tonic::Code::Unknown,
sendresponse.error_message,
))
}
}
async fn get_tree_state(
&mut self,
block_id: BlockId,
timeout: Duration,
) -> Result<TreeState, tonic::Status> {
let mut request = Request::new(block_id);
request.set_timeout(timeout);
Ok(self
.clear_net_client
.get_tree_state(request)
.await?
.into_inner())
}
async fn get_block(
&mut self,
block_id: BlockId,
timeout: Duration,
) -> Result<CompactBlock, tonic::Status> {
let mut request = Request::new(block_id);
request.set_timeout(timeout);
Ok(self.clear_net_client.get_block(request).await?.into_inner())
}
#[allow(deprecated)]
async fn get_block_nullifiers(
&mut self,
block_id: BlockId,
timeout: Duration,
) -> Result<CompactBlock, tonic::Status> {
let mut request = Request::new(block_id);
request.set_timeout(timeout);
Ok(self
.clear_net_client
.get_block_nullifiers(request)
.await?
.into_inner())
}
async fn get_block_range(
&mut self,
range: BlockRange,
timeout: Duration,
) -> Result<tonic::Streaming<CompactBlock>, tonic::Status> {
let mut request = Request::new(range);
request.set_timeout(timeout);
Ok(self
.clear_net_client
.get_block_range(request)
.await?
.into_inner())
}
#[allow(deprecated)]
async fn get_block_range_nullifiers(
&mut self,
range: BlockRange,
timeout: Duration,
) -> Result<tonic::Streaming<CompactBlock>, tonic::Status> {
let mut request = Request::new(range);
request.set_timeout(timeout);
Ok(self
.clear_net_client
.get_block_range_nullifiers(request)
.await?
.into_inner())
}
async fn get_transaction(
&mut self,
filter: TxFilter,
timeout: Duration,
) -> Result<RawTransaction, tonic::Status> {
let mut request = Request::new(filter);
request.set_timeout(timeout);
Ok(self
.clear_net_client
.get_transaction(request)
.await?
.into_inner())
}
async fn get_mempool_tx(
&mut self,
request: GetMempoolTxRequest,
timeout: Duration,
) -> Result<tonic::Streaming<CompactTx>, tonic::Status> {
let mut request = Request::new(request);
request.set_timeout(timeout);
Ok(self
.clear_net_client
.get_mempool_tx(request)
.await?
.into_inner())
}
async fn get_mempool_stream(
&mut self,
timeout: Duration,
) -> Result<tonic::Streaming<RawTransaction>, tonic::Status> {
let mut request = Request::new(Empty {});
request.set_timeout(timeout);
Ok(self
.clear_net_client
.get_mempool_stream(request)
.await?
.into_inner())
}
async fn get_latest_tree_state(
&mut self,
timeout: Duration,
) -> Result<TreeState, tonic::Status> {
let mut request = Request::new(Empty {});
request.set_timeout(timeout);
Ok(self
.clear_net_client
.get_latest_tree_state(request)
.await?
.into_inner())
}
async fn get_subtree_roots(
&mut self,
arg: GetSubtreeRootsArg,
timeout: Duration,
) -> Result<tonic::Streaming<SubtreeRoot>, tonic::Status> {
let mut request = Request::new(arg);
request.set_timeout(timeout);
Ok(self
.clear_net_client
.get_subtree_roots(request)
.await?
.into_inner())
}
#[cfg(feature = "ping-very-insecure")]
async fn ping(
&mut self,
duration: ProtoDuration,
timeout: Duration,
) -> Result<PingResponse, tonic::Status> {
let mut request = Request::new(duration);
request.set_timeout(timeout);
Ok(self.clear_net_client.ping(request).await?.into_inner())
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use http::{Request, Response};
use hyper::{
body::{Bytes, Incoming},
service::service_fn,
};
use hyper_util::rt::TokioIo;
use tokio::{net::TcpListener, sync::oneshot, time::timeout};
use tokio_rustls::{TlsAcceptor, rustls};
use super::*;
use tokio_rustls::rustls::RootCertStore;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
fn add_test_cert_to_roots(roots: &mut RootCertStore) {
use tonic::transport::CertificateDer;
eprintln!("Adding test cert to roots");
const TEST_PEMFILE_PATH: &str = "test-data/localhost.pem";
let Ok(fd) = std::fs::File::open(TEST_PEMFILE_PATH) else {
eprintln!("Test TLS cert not found at {TEST_PEMFILE_PATH}, skipping");
return;
};
let mut buf = std::io::BufReader::new(fd);
let certs_bytes: Vec<tonic::transport::CertificateDer> = rustls_pemfile::certs(&mut buf)
.filter_map(Result::ok)
.collect();
let certs: Vec<CertificateDer<'_>> = certs_bytes.into_iter().collect();
roots.add_parsable_certificates(certs);
}
#[test]
fn localhost_cert_file_exists_and_is_parseable() {
const CERT_PATH: &str = "test-data/localhost.pem";
let pem = std::fs::read(CERT_PATH).expect("missing test-data/localhost.pem");
let mut cursor = std::io::BufReader::new(pem.as_slice());
let certs = rustls_pemfile::certs(&mut cursor)
.filter_map(Result::ok)
.collect::<Vec<_>>();
assert!(!certs.is_empty(), "no certs found in {CERT_PATH}");
for cert in certs {
let der = cert.as_ref();
let parsed = x509_parser::parse_x509_certificate(der);
assert!(
parsed.is_ok(),
"failed to parse a cert from {CERT_PATH} as X.509"
);
}
}
#[test]
fn localhost_cert_is_end_entity_not_ca() {
let pem =
std::fs::read("test-data/localhost.pem").expect("missing test-data/localhost.pem");
let mut cursor = std::io::BufReader::new(pem.as_slice());
let certs = rustls_pemfile::certs(&mut cursor)
.filter_map(Result::ok)
.collect::<Vec<_>>();
assert!(!certs.is_empty(), "no certs found in localhost.pem");
let der = certs[0].as_ref();
let parsed = x509_parser::parse_x509_certificate(der).expect("failed to parse X.509");
let x509 = parsed.1;
let constraints = x509
.basic_constraints()
.expect("missing basic constraints extension");
assert!(
!constraints.unwrap().value.ca,
"localhost.pem must be CA:FALSE"
);
}
fn load_test_server_config() -> std::sync::Arc<rustls::ServerConfig> {
let cert_pem =
std::fs::read("test-data/localhost.pem").expect("missing test-data/localhost.pem");
let key_pem =
std::fs::read("test-data/localhost.key").expect("missing test-data/localhost.key");
let mut cert_cursor = std::io::BufReader::new(cert_pem.as_slice());
let mut key_cursor = std::io::BufReader::new(key_pem.as_slice());
let certs = rustls_pemfile::certs(&mut cert_cursor)
.filter_map(Result::ok)
.collect::<Vec<_>>();
let key = rustls_pemfile::private_key(&mut key_cursor)
.expect("failed to read private key")
.expect("no private key found");
let config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)
.expect("bad cert or key");
std::sync::Arc::new(config)
}
#[tokio::test]
async fn add_test_cert_to_roots_enables_tls_handshake() {
use http_body_util::Full;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tokio_rustls::TlsAcceptor;
use tokio_rustls::rustls;
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind failed");
let addr = listener.local_addr().expect("local_addr failed");
let tls_config = load_test_server_config();
let acceptor = TlsAcceptor::from(tls_config);
let ready = oneshot::channel::<()>();
let ready_tx = ready.0;
let ready_rx = ready.1;
let server_task = tokio::spawn(async move {
let _ = ready_tx.send(());
let accept_res = timeout(Duration::from_secs(3), listener.accept()).await;
let (socket, _) = accept_res
.expect("server accept timed out")
.expect("accept failed");
let tls_stream = timeout(Duration::from_secs(3), acceptor.accept(socket))
.await
.expect("tls accept timed out")
.expect("tls accept failed");
let io = TokioIo::new(tls_stream);
let svc = service_fn(|mut req: http::Request<hyper::body::Incoming>| async move {
use http_body_util::BodyExt;
while let Some(frame) = req.body_mut().frame().await {
if frame.is_err() {
break;
}
}
let mut resp = http::Response::new(Full::new(Bytes::from_static(b"ok")));
resp.headers_mut().insert(
http::header::CONNECTION,
http::HeaderValue::from_static("close"),
);
Ok::<_, hyper::Error>(resp)
});
timeout(
Duration::from_secs(3),
hyper::server::conn::http1::Builder::new()
.keep_alive(false)
.serve_connection(io, svc),
)
.await
.expect("serve_connection timed out")
.expect("serve_connection failed");
});
timeout(Duration::from_secs(1), ready_rx)
.await
.expect("server ready signal timed out")
.expect("server dropped before ready");
let mut roots = rustls::RootCertStore::empty();
add_test_cert_to_roots(&mut roots);
let client_config = rustls::ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
let https = hyper_rustls::HttpsConnectorBuilder::new()
.with_tls_config(client_config)
.https_only()
.enable_http1()
.build();
let client =
hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
.build(https);
let uri: http::Uri = format!("https://127.0.0.1:{}/", addr.port())
.parse()
.expect("bad uri");
let req = http::Request::builder()
.method("GET")
.uri(uri)
.body(Full::<Bytes>::new(Bytes::new()))
.expect("request build failed");
let res = timeout(Duration::from_secs(3), client.request(req))
.await
.expect("client request timed out")
.expect("TLS handshake or request failed");
assert!(res.status().is_success());
timeout(Duration::from_secs(3), server_task)
.await
.expect("server task timed out")
.expect("server task failed");
}
#[tokio::test]
async fn rejects_non_http_schemes() {
let uri: http::Uri = "ftp://example.com:1234".parse().unwrap();
let res = GrpcIndexer::new(uri).await;
assert!(
res.is_err(),
"expected GrpcIndexer::new() to reject non-http(s) schemes, but got Ok"
);
}
#[tokio::test]
async fn https_connector_must_not_downgrade_to_http1() {
use http_body_util::Full;
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind failed");
let addr = listener.local_addr().expect("local_addr failed");
let tls_config = load_test_server_config();
let acceptor = TlsAcceptor::from(tls_config);
let server_task = tokio::spawn(async move {
let accept_res = timeout(Duration::from_secs(3), listener.accept()).await;
let (socket, _) = accept_res
.expect("server accept timed out")
.expect("accept failed");
let tls_stream = acceptor.accept(socket).await.expect("tls accept failed");
let io = TokioIo::new(tls_stream);
let svc = service_fn(|_req: Request<Incoming>| async move {
Ok::<_, hyper::Error>(Response::new(Full::new(Bytes::from_static(b"ok"))))
});
let _ = hyper::server::conn::http1::Builder::new()
.serve_connection(io, svc)
.await;
});
let base = format!("https://127.0.0.1:{}", addr.port());
let uri = base.parse::<http::Uri>().expect("bad base uri");
let endpoint = tonic::transport::Endpoint::from_shared(uri.to_string())
.expect("endpoint")
.tcp_nodelay(true);
let connect_res = endpoint
.tls_config(client_tls_config())
.expect("tls_config failed")
.connect()
.await;
assert!(
connect_res.is_err(),
"expected connect to fail (no downgrade to HTTP/1.1), but it succeeded"
);
server_task.abort();
}
#[tokio::test]
async fn connects_to_public_mainnet_indexer_and_gets_info() {
let endpoint = "https://zec.rocks:443".to_string();
let uri: http::Uri = endpoint.parse().expect("bad mainnet indexer URI");
let response = GrpcIndexer::new(uri)
.await
.expect("URI to be valid.")
.get_lightd_info(DEFAULT_TIMEOUT)
.await
.expect("to get info");
assert!(
!response.chain_name.is_empty(),
"chain_name should not be empty"
);
assert!(
response.block_height > 0,
"block_height should be > 0, got {}",
response.block_height
);
let chain = response.chain_name.to_ascii_lowercase();
assert!(
chain.contains("main"),
"expected a mainnet server, got chain_name={:?}",
response.chain_name
);
}
#[tokio::test]
async fn get_block_range_supports_descending_order() {
use tokio_stream::StreamExt;
let uri: http::Uri = "https://zec.rocks:443".parse().unwrap();
let mut indexer = GrpcIndexer::new(uri).await.expect("valid URI");
let tip = indexer
.get_latest_block(DEFAULT_TIMEOUT)
.await
.expect("get_latest_block");
let start_height = tip.height;
let end_height = start_height.saturating_sub(4);
let range = BlockRange {
start: Some(BlockId {
height: start_height,
hash: vec![],
}),
end: Some(BlockId {
height: end_height,
hash: vec![],
}),
pool_types: vec![],
};
let mut stream = indexer
.get_block_range(range, DEFAULT_TIMEOUT)
.await
.expect("get_block_range");
let mut heights = Vec::new();
while let Some(block) = stream.next().await {
let block = block.expect("stream item");
heights.push(block.height);
}
assert!(
!heights.is_empty(),
"expected at least one block in the descending range",
);
for window in heights.windows(2) {
assert!(
window[0] > window[1],
"expected descending order, but got heights: {heights:?}",
);
}
}
}