1#![warn(missing_docs)]
7use std::sync::Arc;
8
9use client::client_from_connector;
10use http::{Uri, uri::PathAndQuery};
11use hyper_util::client::legacy::connect::HttpConnector;
12use tokio_rustls::rustls::pki_types::{Der, TrustAnchor};
13use tokio_rustls::rustls::{ClientConfig, RootCertStore};
14use tower::ServiceExt;
15use tower::util::BoxCloneService;
16use zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient;
17
18pub type UnderlyingService = BoxCloneService<
20 http::Request<tonic::body::Body>,
21 http::Response<hyper::body::Incoming>,
22 hyper_util::client::legacy::Error,
23>;
24
25#[allow(missing_docs)] #[derive(Debug, thiserror::Error)]
27pub enum GetClientError {
28 #[error("bad uri: invalid scheme")]
29 InvalidScheme,
30 #[error("bad uri: invalid authority")]
31 InvalidAuthority,
32 #[error("bad uri: invalid path and/or query")]
33 InvalidPathAndQuery,
34}
35
36pub mod client {
38 use http_body::Body;
39 use hyper_util::client::legacy::{Client, connect::Connect};
40 pub fn client_from_connector<C, B>(connector: C, http2_only: bool) -> Box<Client<C, B>>
42 where
43 C: Connect + Clone,
44 B: Body + Send,
45 B::Data: Send,
46 {
47 Box::new(
48 Client::builder(hyper_util::rt::TokioExecutor::new())
49 .http2_only(http2_only)
50 .build(connector),
51 )
52 }
53}
54
55#[derive(Clone)]
60pub struct GrpcConnector {
61 uri: http::Uri,
62}
63
64impl GrpcConnector {
65 pub fn new(uri: http::Uri) -> Self {
67 Self { uri }
68 }
69
70 pub fn uri(&self) -> &Uri {
72 &self.uri
73 }
74
75 pub fn get_client(
79 &self,
80 ) -> impl std::future::Future<
81 Output = Result<CompactTxStreamerClient<UnderlyingService>, GetClientError>,
82 > {
83 let uri = Arc::new(self.uri.clone());
84 async move {
85 let mut http_connector = HttpConnector::new();
86 http_connector.enforce_http(false);
87 let scheme = uri.scheme().ok_or(GetClientError::InvalidScheme)?.clone();
88 let authority = uri
89 .authority()
90 .ok_or(GetClientError::InvalidAuthority)?
91 .clone();
92 if uri.scheme_str() == Some("https") {
93 let mut root_store = RootCertStore::empty();
94 root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().map(|anchor_ref| {
96 TrustAnchor {
97 subject: Der::from_slice(anchor_ref.subject),
98 subject_public_key_info: Der::from_slice(anchor_ref.spki),
99 name_constraints: anchor_ref.name_constraints.map(Der::from_slice),
100 }
101 }));
102
103 #[cfg(test)]
104 add_test_cert_to_roots(&mut root_store);
105
106 let config = ClientConfig::builder()
107 .with_root_certificates(root_store)
108 .with_no_client_auth();
109
110 let connector = tower::ServiceBuilder::new()
111 .layer_fn(move |s| {
112 let tls = config.clone();
113
114 hyper_rustls::HttpsConnectorBuilder::new()
115 .with_tls_config(tls)
116 .https_or_http()
117 .enable_http2()
118 .wrap_connector(s)
119 })
120 .service(http_connector);
121 let client = client_from_connector(connector, false);
122 let svc = tower::ServiceBuilder::new()
123 .map_request(move |mut request: http::Request<_>| {
125 let path_and_query = request
126 .uri()
127 .path_and_query()
128 .cloned()
129 .unwrap_or(PathAndQuery::from_static("/"));
130 let uri = Uri::builder()
131 .scheme(scheme.clone())
132 .authority(authority.clone())
133 .path_and_query(path_and_query)
136 .build()
137 .unwrap();
138
139 *request.uri_mut() = uri;
140 request
141 })
142 .service(client);
143
144 Ok(CompactTxStreamerClient::new(svc.boxed_clone()))
145 } else {
146 let connector = tower::ServiceBuilder::new().service(http_connector);
147 let client = client_from_connector(connector, true);
148 let svc = tower::ServiceBuilder::new()
149 .map_request(move |mut request: http::Request<_>| {
151 let path_and_query = request
152 .uri()
153 .path_and_query()
154 .cloned()
155 .unwrap_or(PathAndQuery::from_static("/"));
156 let uri = Uri::builder()
157 .scheme(scheme.clone())
158 .authority(authority.clone())
159 .path_and_query(path_and_query)
162 .build()
163 .unwrap();
164
165 *request.uri_mut() = uri;
166 request
167 })
168 .service(client);
169
170 Ok(CompactTxStreamerClient::new(svc.boxed_clone()))
171 }
172 }
173 }
174}
175
176#[cfg(test)]
177fn add_test_cert_to_roots(roots: &mut RootCertStore) {
178 use tonic::transport::CertificateDer;
179
180 const TEST_PEMFILE_PATH: &str = "test-data/localhost.pem";
181 let fd = std::fs::File::open(TEST_PEMFILE_PATH).unwrap();
182 let mut buf = std::io::BufReader::new(&fd);
183 let certs_bytes: Vec<tonic::transport::CertificateDer> = rustls_pemfile::certs(&mut buf)
184 .filter_map(Result::ok)
185 .collect();
186 let certs: Vec<CertificateDer<'_>> = certs_bytes.into_iter().collect();
187
188 roots.add_parsable_certificates(certs);
189}