1use std::time::Duration;
5
6use backoff::ExponentialBackoff;
7use http::{uri::InvalidUri, Uri};
8use hyper_proxy2::{Intercept, Proxy, ProxyConnector};
9use hyper_socks2::{Auth, SocksConnector};
10use hyper_util::client::legacy::connect::HttpConnector;
11use tonic::{
12 body::Body,
13 client::GrpcService,
14 transport::{Channel, ClientTlsConfig, Endpoint},
15};
16use tower::{Layer, ServiceBuilder};
17use url::Url;
18
19use qcs_api_client_common::{
20 backoff::{self, default_backoff},
21 configuration::{tokens::TokenRefresher, ClientConfiguration, LoadError, TokenError},
22};
23
24#[cfg(feature = "tracing")]
25use qcs_api_client_common::tracing_configuration::TracingConfiguration;
26
27#[cfg(feature = "tracing")]
28use super::trace::{build_trace_layer, CustomTraceLayer, CustomTraceService};
29use super::{Error, RefreshLayer, RefreshService, RetryLayer, RetryService};
30
31#[derive(Debug, thiserror::Error)]
33#[non_exhaustive]
34pub enum ChannelError {
35 #[error("Failed to parse URI: {0}")]
37 InvalidUri(#[from] InvalidUri),
38 #[error("Failed to parse URL: {0}")]
40 InvalidUrl(#[from] url::ParseError),
41 #[error("Protocol is missing or not supported: {0:?}")]
43 UnsupportedProtocol(Option<String>),
44 #[error("HTTP proxy ssl verification failed: {0}")]
46 SslFailure(#[from] std::io::Error),
47 #[error("Cannot set separate https and http proxies if one of them is socks5")]
49 Mismatch {
50 https_proxy: Uri,
52 http_proxy: Uri,
54 },
55}
56
57pub trait IntoService<C: GrpcService<Body>> {
59 type Service: GrpcService<Body>;
61
62 fn into_service(self, channel: C) -> Self::Service;
64}
65
66impl<C> IntoService<C> for ()
67where
68 C: GrpcService<Body>,
69{
70 type Service = C;
71 fn into_service(self, channel: C) -> Self::Service {
72 channel
73 }
74}
75
76#[derive(Clone, Debug)]
78pub struct RefreshOptions<O, R>
79where
80 R: TokenRefresher + Clone + Send + Sync,
81{
82 layer: RefreshLayer<R>,
83 other: O,
84}
85
86impl<T> From<T> for RefreshOptions<(), T>
87where
88 T: TokenRefresher + Clone + Send + Sync,
89{
90 fn from(refresher: T) -> Self {
91 Self {
92 layer: RefreshLayer::with_refresher(refresher),
93 other: (),
94 }
95 }
96}
97
98impl<C, T, O> IntoService<C> for RefreshOptions<O, T>
99where
100 C: GrpcService<Body>,
101 O: IntoService<C>,
102 O::Service: GrpcService<Body>,
103 RefreshService<O::Service, T>: GrpcService<Body>,
104 T: TokenRefresher + Clone + Send + Sync + 'static,
105{
106 type Service = RefreshService<O::Service, T>;
107 fn into_service(self, channel: C) -> Self::Service {
108 let service = self.other.into_service(channel);
109 self.layer.layer(service)
110 }
111}
112
113#[derive(Clone, Debug)]
115pub struct RetryOptions<O = ()> {
116 layer: RetryLayer,
117 other: O,
118}
119
120impl From<ExponentialBackoff> for RetryOptions<()> {
121 fn from(backoff: ExponentialBackoff) -> Self {
122 Self {
123 layer: RetryLayer { backoff },
124 other: (),
125 }
126 }
127}
128
129impl<C, O> IntoService<C> for RetryOptions<O>
130where
131 C: GrpcService<Body>,
132 O: IntoService<C>,
133 O::Service: GrpcService<Body>,
134 RetryService<O::Service>: GrpcService<Body>,
135{
136 type Service = RetryService<O::Service>;
137 fn into_service(self, channel: C) -> Self::Service {
138 let service = self.other.into_service(channel);
139 self.layer.layer(service)
140 }
141}
142
143#[derive(Clone, Debug)]
145pub struct ChannelBuilder<O = ()> {
146 endpoint: Endpoint,
147 #[cfg(feature = "tracing")]
148 trace_layer: CustomTraceLayer,
149 options: O,
150}
151
152impl From<Endpoint> for ChannelBuilder<()> {
153 fn from(endpoint: Endpoint) -> Self {
154 #[cfg(feature = "tracing")]
155 {
156 let base_url = endpoint.uri().to_string();
157 Self {
158 endpoint,
159 trace_layer: build_trace_layer(base_url, None),
160 options: (),
161 }
162 }
163
164 #[cfg(not(feature = "tracing"))]
165 return Self {
166 endpoint,
167 options: (),
168 };
169 }
170}
171
172impl ChannelBuilder<()> {
173 pub fn from_uri(uri: Uri) -> Self {
175 #[cfg(feature = "tracing")]
176 {
177 let base_url = uri.to_string();
178 Self {
179 endpoint: get_endpoint(uri),
180 trace_layer: build_trace_layer(base_url, None),
181 options: (),
182 }
183 }
184
185 #[cfg(not(feature = "tracing"))]
186 return Self {
187 endpoint: get_endpoint(uri),
188 options: (),
189 };
190 }
191}
192
193#[cfg(feature = "tracing")]
194type TargetService = CustomTraceService;
195#[cfg(not(feature = "tracing"))]
196type TargetService = Channel;
197
198impl<O> ChannelBuilder<O>
199where
200 O: IntoService<TargetService>,
201{
202 #[must_use]
204 pub fn with_timeout(mut self, timeout: Duration) -> Self {
205 self.endpoint = self.endpoint.timeout(timeout);
206 self
207 }
208
209 pub fn with_refresh_layer<T>(
211 self,
212 layer: RefreshLayer<T>,
213 ) -> ChannelBuilder<RefreshOptions<O, T>>
214 where
215 T: TokenRefresher + Clone + Send + Sync,
216 {
217 #[cfg(feature = "tracing")]
218 return ChannelBuilder {
219 endpoint: self.endpoint,
220 trace_layer: self.trace_layer,
221 options: RefreshOptions {
222 layer,
223 other: self.options,
224 },
225 };
226 #[cfg(not(feature = "tracing"))]
227 return ChannelBuilder {
228 endpoint: self.endpoint,
229 options: RefreshOptions {
230 layer,
231 other: self.options,
232 },
233 };
234 }
235
236 pub fn with_token_refresher<T>(self, refresher: T) -> ChannelBuilder<RefreshOptions<O, T>>
238 where
239 T: TokenRefresher + Clone + Send + Sync,
240 {
241 self.with_refresh_layer(RefreshLayer::with_refresher(refresher))
242 }
243
244 pub fn with_qcs_config(
246 self,
247 config: ClientConfiguration,
248 ) -> ChannelBuilder<RefreshOptions<O, ClientConfiguration>> {
249 #[cfg(feature = "tracing")]
250 {
251 let base_url = self.endpoint.uri().to_string();
252 let trace_layer = build_trace_layer(base_url, config.tracing_configuration());
253 let mut builder = self.with_token_refresher(config);
254 builder.trace_layer = trace_layer;
255 builder
256 }
257 #[cfg(not(feature = "tracing"))]
258 {
259 self.with_token_refresher(config)
260 }
261 }
262
263 pub fn with_qcs_profile(
269 self,
270 profile: Option<String>,
271 ) -> Result<ChannelBuilder<RefreshOptions<O, ClientConfiguration>>, LoadError> {
272 let config = match profile {
273 Some(profile) => ClientConfiguration::load_profile(profile)?,
274 None => ClientConfiguration::load_default()?,
275 };
276
277 Ok(self.with_qcs_config(config))
278 }
279
280 pub fn with_retry_layer(self, layer: RetryLayer) -> ChannelBuilder<RetryOptions<O>> {
282 #[cfg(feature = "tracing")]
283 return ChannelBuilder {
284 endpoint: self.endpoint,
285 trace_layer: self.trace_layer,
286 options: RetryOptions {
287 layer,
288 other: self.options,
289 },
290 };
291 #[cfg(not(feature = "tracing"))]
292 return ChannelBuilder {
293 endpoint: self.endpoint,
294 options: RetryOptions {
295 layer,
296 other: self.options,
297 },
298 };
299 }
300
301 pub fn with_retry_backoff(
303 self,
304 backoff: ExponentialBackoff,
305 ) -> ChannelBuilder<RetryOptions<O>> {
306 self.with_retry_layer(RetryLayer { backoff })
307 }
308
309 pub fn with_default_retry(self) -> ChannelBuilder<RetryOptions<O>> {
311 self.with_retry_backoff(default_backoff())
312 }
313
314 #[allow(clippy::result_large_err)]
320 pub fn build(self) -> Result<O::Service, ChannelError> {
321 let channel = get_channel_with_endpoint(&self.endpoint)?;
322 #[cfg(feature = "tracing")]
323 {
324 let traced_channel = self.trace_layer.layer(channel);
325 Ok(self.options.into_service(traced_channel))
326 }
327
328 #[cfg(not(feature = "tracing"))]
329 Ok(self.options.into_service(channel))
330 }
331}
332
333#[allow(clippy::result_large_err)]
341pub fn parse_uri(s: &str) -> Result<Uri, Error<TokenError>> {
342 s.parse().map_err(Error::from)
343}
344
345#[allow(clippy::missing_panics_doc)]
347pub fn get_endpoint(uri: Uri) -> Endpoint {
348 Channel::builder(uri)
349 .user_agent(concat!(
350 "QCS gRPC Client (Rust)/",
351 env!("CARGO_PKG_VERSION")
352 ))
353 .expect("user agent string should be valid")
354 .tls_config(ClientTlsConfig::new().with_enabled_roots())
355 .expect("tls setup should succeed")
356}
357
358pub fn get_endpoint_with_timeout(uri: Uri, timeout: Option<Duration>) -> Endpoint {
360 if let Some(duration) = timeout {
361 get_endpoint(uri).timeout(duration)
362 } else {
363 get_endpoint(uri)
364 }
365}
366
367fn get_env_uri(key: &str) -> Result<Option<Uri>, InvalidUri> {
370 std::env::var(key)
371 .or_else(|_| std::env::var(key.to_lowercase()))
372 .ok()
373 .map(Uri::try_from)
374 .transpose()
375}
376
377fn get_uri_socks_auth(uri: &Uri) -> Result<Option<Auth>, url::ParseError> {
379 let full_url = uri.to_string().parse::<Url>()?;
380 let user = full_url.username();
381 let auth = if user.is_empty() {
382 None
383 } else {
384 let pass = full_url.password().unwrap_or_default();
385 Some(Auth::new(user, pass))
386 };
387 Ok(auth)
388}
389
390#[allow(clippy::result_large_err)]
406pub fn get_channel(uri: Uri) -> Result<Channel, ChannelError> {
407 let endpoint = get_endpoint(uri);
408 get_channel_with_endpoint(&endpoint)
409}
410
411#[allow(clippy::result_large_err)]
428pub fn get_channel_with_timeout(
429 uri: Uri,
430 timeout: Option<Duration>,
431) -> Result<Channel, ChannelError> {
432 let endpoint = get_endpoint_with_timeout(uri, timeout);
433 get_channel_with_endpoint(&endpoint)
434}
435
436#[allow(
455 clippy::similar_names,
456 reason = "http(s)_proxy are similar but precise in this case"
457)]
458#[allow(clippy::result_large_err)]
459pub fn get_channel_with_endpoint(endpoint: &Endpoint) -> Result<Channel, ChannelError> {
460 let https_proxy = get_env_uri("HTTPS_PROXY")?;
461 let http_proxy = get_env_uri("HTTP_PROXY")?;
462
463 let mut connector = HttpConnector::new();
464 connector.enforce_http(false);
465
466 let connect_to = |uri: http::Uri, intercept: Intercept| {
467 let connector = connector.clone();
468 match uri.scheme_str() {
469 Some("socks5") => {
470 let socks_connector = SocksConnector {
471 auth: get_uri_socks_auth(&uri)?,
472 proxy_addr: uri,
473 connector,
474 };
475 Ok(endpoint.connect_with_connector_lazy(socks_connector))
476 }
477 Some("https" | "http") => {
478 let is_http = uri.scheme() == Some(&http::uri::Scheme::HTTP);
479 let proxy = Proxy::new(intercept, uri);
480 let mut proxy_connector = ProxyConnector::from_proxy(connector, proxy)?;
481 if is_http {
482 proxy_connector.set_tls(None);
483 }
484 Ok(endpoint.connect_with_connector_lazy(proxy_connector))
485 }
486 scheme => Err(ChannelError::UnsupportedProtocol(scheme.map(String::from))),
487 }
488 };
489
490 let channel = match (https_proxy, http_proxy) {
491 (None, None) => endpoint.connect_lazy(),
493
494 (Some(https_proxy), None) => connect_to(https_proxy, Intercept::Https)?,
496 (None, Some(http_proxy)) => connect_to(http_proxy, Intercept::Http)?,
497
498 (Some(https_proxy), Some(http_proxy)) => {
501 if https_proxy == http_proxy {
502 connect_to(https_proxy, Intercept::All)?
503 } else {
504 let accepted = [https_proxy.scheme_str(), http_proxy.scheme_str()]
505 .into_iter()
506 .all(|scheme| matches!(scheme, Some("https" | "http")));
507 if accepted {
508 let mut proxy_connector = ProxyConnector::new(connector)?;
509 proxy_connector.extend_proxies(vec![
510 Proxy::new(Intercept::Https, https_proxy),
511 Proxy::new(Intercept::Http, http_proxy),
512 ]);
513 endpoint.connect_with_connector_lazy(proxy_connector)
514 } else {
515 return Err(ChannelError::Mismatch {
516 https_proxy,
517 http_proxy,
518 });
519 }
520 }
521 }
522 };
523
524 Ok(channel)
525}
526
527#[allow(clippy::result_large_err)]
533pub fn get_wrapped_channel(
534 uri: Uri,
535) -> Result<RefreshService<Channel, ClientConfiguration>, Error<TokenError>> {
536 wrap_channel(get_channel(uri)?)
537}
538
539#[must_use]
541pub fn wrap_channel_with<C>(
542 channel: C,
543 config: ClientConfiguration,
544) -> RefreshService<C, ClientConfiguration>
545where
546 C: GrpcService<Body>,
547{
548 ServiceBuilder::new()
549 .layer(RefreshLayer::with_config(config))
550 .service(channel)
551}
552
553pub fn wrap_channel_with_token_refresher<C, T>(
557 channel: C,
558 token_refresher: T,
559) -> RefreshService<C, T>
560where
561 C: GrpcService<Body>,
562 T: TokenRefresher + Clone + Send + Sync,
563{
564 ServiceBuilder::new()
565 .layer(RefreshLayer::with_refresher(token_refresher))
566 .service(channel)
567}
568
569#[allow(clippy::result_large_err)]
575pub fn wrap_channel<C>(
576 channel: C,
577) -> Result<RefreshService<C, ClientConfiguration>, Error<TokenError>>
578where
579 C: GrpcService<Body>,
580{
581 Ok(wrap_channel_with(channel, {
582 ClientConfiguration::load_default()?
583 }))
584}
585
586#[allow(clippy::result_large_err)]
592pub fn wrap_channel_with_profile<C>(
593 channel: C,
594 profile: String,
595) -> Result<RefreshService<C, ClientConfiguration>, Error<TokenError>>
596where
597 C: GrpcService<Body>,
598{
599 Ok(wrap_channel_with(
600 channel,
601 ClientConfiguration::load_profile(profile)?,
602 ))
603}
604
605pub fn wrap_channel_with_retry<C>(channel: C) -> RetryService<C>
607where
608 C: GrpcService<Body>,
609{
610 ServiceBuilder::new()
611 .layer(RetryLayer::default())
612 .service(channel)
613}
614
615#[cfg(feature = "tracing")]
616pub fn wrap_channel_with_tracing(
618 channel: Channel,
619 base_url: String,
620 configuration: TracingConfiguration,
621) -> CustomTraceService {
622 ServiceBuilder::new()
623 .layer(build_trace_layer(base_url, Some(&configuration)))
624 .service(channel)
625}