1#![warn(missing_docs)]
7use std::sync::Arc;
8
9use client::client_from_connector;
10use http::{Uri, uri::PathAndQuery};
11use http_body_util::combinators::UnsyncBoxBody;
12use hyper_util::client::legacy::connect::HttpConnector;
13use tokio_rustls::rustls::pki_types::{Der, TrustAnchor};
14use tokio_rustls::rustls::{ClientConfig, RootCertStore};
15use tonic::Status;
16use tower::ServiceExt;
17use tower::util::BoxCloneService;
18use zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient;
19
20pub type UnderlyingService = BoxCloneService<
22 http::Request<UnsyncBoxBody<prost::bytes::Bytes, Status>>,
23 http::Response<hyper::body::Incoming>,
24 hyper_util::client::legacy::Error,
25>;
26
27#[allow(missing_docs)] #[derive(Debug, thiserror::Error)]
29pub enum GetClientError {
30 #[error("bad uri: invalid scheme")]
31 InvalidScheme,
32 #[error("bad uri: invalid authority")]
33 InvalidAuthority,
34 #[error("bad uri: invalid path and/or query")]
35 InvalidPathAndQuery,
36}
37
38pub mod client {
40 use http_body::Body;
41 use hyper_util::client::legacy::{Client, connect::Connect};
42 pub fn client_from_connector<C, B>(connector: C, http2_only: bool) -> Box<Client<C, B>>
44 where
45 C: Connect + Clone,
46 B: Body + Send,
47 B::Data: Send,
48 {
49 Box::new(
50 Client::builder(hyper_util::rt::TokioExecutor::new())
51 .http2_only(http2_only)
52 .build(connector),
53 )
54 }
55}
56
57#[derive(Clone)]
62pub struct GrpcConnector {
63 uri: http::Uri,
64}
65
66impl GrpcConnector {
67 pub fn new(uri: http::Uri) -> Self {
69 Self { uri }
70 }
71
72 pub fn uri(&self) -> &Uri {
74 &self.uri
75 }
76
77 pub fn get_client(
81 &self,
82 ) -> impl std::future::Future<
83 Output = Result<CompactTxStreamerClient<UnderlyingService>, GetClientError>,
84 > {
85 let uri = Arc::new(self.uri.clone());
86 async move {
87 let mut http_connector = HttpConnector::new();
88 http_connector.enforce_http(false);
89 let scheme = uri.scheme().ok_or(GetClientError::InvalidScheme)?.clone();
90 let authority = uri
91 .authority()
92 .ok_or(GetClientError::InvalidAuthority)?
93 .clone();
94 if uri.scheme_str() == Some("https") {
95 let mut root_store = RootCertStore::empty();
96 root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().map(|anchor_ref| {
98 TrustAnchor {
99 subject: Der::from_slice(anchor_ref.subject),
100 subject_public_key_info: Der::from_slice(anchor_ref.spki),
101 name_constraints: anchor_ref.name_constraints.map(Der::from_slice),
102 }
103 }));
104
105 #[cfg(test)]
106 add_test_cert_to_roots(&mut root_store);
107
108 let config = ClientConfig::builder()
109 .with_root_certificates(root_store)
110 .with_no_client_auth();
111
112 let connector = tower::ServiceBuilder::new()
113 .layer_fn(move |s| {
114 let tls = config.clone();
115
116 hyper_rustls::HttpsConnectorBuilder::new()
117 .with_tls_config(tls)
118 .https_or_http()
119 .enable_http2()
120 .wrap_connector(s)
121 })
122 .service(http_connector);
123 let client = client_from_connector(connector, false);
124 let svc = tower::ServiceBuilder::new()
125 .map_request(move |mut request: http::Request<tonic::body::BoxBody>| {
127 let path_and_query = request
128 .uri()
129 .path_and_query()
130 .cloned()
131 .unwrap_or(PathAndQuery::from_static("/"));
132 let uri = Uri::builder()
133 .scheme(scheme.clone())
134 .authority(authority.clone())
135 .path_and_query(path_and_query)
138 .build()
139 .unwrap();
140
141 *request.uri_mut() = uri;
142 request
143 })
144 .service(client);
145
146 Ok(CompactTxStreamerClient::new(svc.boxed_clone()))
147 } else {
148 let connector = tower::ServiceBuilder::new().service(http_connector);
149 let client = client_from_connector(connector, true);
150 let svc = tower::ServiceBuilder::new()
151 .map_request(move |mut request: http::Request<tonic::body::BoxBody>| {
153 let path_and_query = request
154 .uri()
155 .path_and_query()
156 .cloned()
157 .unwrap_or(PathAndQuery::from_static("/"));
158 let uri = Uri::builder()
159 .scheme(scheme.clone())
160 .authority(authority.clone())
161 .path_and_query(path_and_query)
164 .build()
165 .unwrap();
166
167 *request.uri_mut() = uri;
168 request
169 })
170 .service(client);
171
172 Ok(CompactTxStreamerClient::new(svc.boxed_clone()))
173 }
174 }
175 }
176}
177
178#[cfg(test)]
179fn add_test_cert_to_roots(roots: &mut RootCertStore) {
180 use tonic::transport::CertificateDer;
181
182 const TEST_PEMFILE_PATH: &str = "test-data/localhost.pem";
183 let fd = std::fs::File::open(TEST_PEMFILE_PATH).unwrap();
184 let mut buf = std::io::BufReader::new(&fd);
185 let certs_bytes: Vec<tonic::transport::CertificateDer> = rustls_pemfile::certs(&mut buf)
186 .filter_map(Result::ok)
187 .collect();
188 let certs: Vec<CertificateDer<'_>> = certs_bytes.into_iter().collect();
189
190 roots.add_parsable_certificates(certs);
191}