1use std::time::Duration;
5
6use backoff::ExponentialBackoff;
7use http::{uri::InvalidUri, Uri};
8use hyper_proxy2::{Intercept, Proxy, ProxyConnector};
9use hyper_socks2::{Auth, SocksConnector};
10use hyper_util::client::legacy::connect::HttpConnector;
11use tonic::{
12 body::Body,
13 client::GrpcService,
14 transport::{Channel, ClientTlsConfig, Endpoint},
15};
16use tower::{Layer, ServiceBuilder};
17use url::Url;
18
19use qcs_api_client_common::{
20 backoff::{self, default_backoff},
21 configuration::{tokens::TokenRefresher, ClientConfiguration, LoadError, TokenError},
22};
23
24#[cfg(feature = "tracing")]
25use qcs_api_client_common::tracing_configuration::TracingConfiguration;
26
27#[cfg(feature = "tracing")]
28use super::trace::{build_trace_layer, CustomTraceLayer, CustomTraceService};
29use super::{Error, RefreshLayer, RefreshService, RetryLayer, RetryService};
30
31#[derive(Debug, thiserror::Error)]
33#[non_exhaustive]
34pub enum ChannelError {
35 #[error("Failed to parse URI: {0}")]
37 InvalidUri(#[from] InvalidUri),
38 #[error("Failed to parse URL: {0}")]
40 InvalidUrl(#[from] url::ParseError),
41 #[error("Protocol is missing or not supported: {0:?}")]
43 UnsupportedProtocol(Option<String>),
44 #[error("HTTP proxy ssl verification failed: {0}")]
46 SslFailure(#[from] std::io::Error),
47 #[error("Cannot set separate https and http proxies if one of them is socks5")]
49 Mismatch {
50 https_proxy: Uri,
52 http_proxy: Uri,
54 },
55}
56
57pub trait IntoService<C: GrpcService<Body>> {
59 type Service: GrpcService<Body>;
61
62 fn into_service(self, channel: C) -> Self::Service;
64}
65
66impl<C> IntoService<C> for ()
67where
68 C: GrpcService<Body>,
69{
70 type Service = C;
71 fn into_service(self, channel: C) -> Self::Service {
72 channel
73 }
74}
75
76#[derive(Clone, Debug)]
78pub struct RefreshOptions<O, R>
79where
80 R: TokenRefresher + Clone + Send + Sync,
81{
82 layer: RefreshLayer<R>,
83 other: O,
84}
85
86impl<T> From<T> for RefreshOptions<(), T>
87where
88 T: TokenRefresher + Clone + Send + Sync,
89{
90 fn from(refresher: T) -> Self {
91 Self {
92 layer: RefreshLayer::with_refresher(refresher),
93 other: (),
94 }
95 }
96}
97
98impl<C, T, O> IntoService<C> for RefreshOptions<O, T>
99where
100 C: GrpcService<Body>,
101 O: IntoService<C>,
102 O::Service: GrpcService<Body>,
103 RefreshService<O::Service, T>: GrpcService<Body>,
104 T: TokenRefresher + Clone + Send + Sync + 'static,
105{
106 type Service = RefreshService<O::Service, T>;
107 fn into_service(self, channel: C) -> Self::Service {
108 let service = self.other.into_service(channel);
109 self.layer.layer(service)
110 }
111}
112
113#[derive(Clone, Debug)]
115pub struct RetryOptions<O = ()> {
116 layer: RetryLayer,
117 other: O,
118}
119
120impl From<ExponentialBackoff> for RetryOptions<()> {
121 fn from(backoff: ExponentialBackoff) -> Self {
122 Self {
123 layer: RetryLayer { backoff },
124 other: (),
125 }
126 }
127}
128
129impl<C, O> IntoService<C> for RetryOptions<O>
130where
131 C: GrpcService<Body>,
132 O: IntoService<C>,
133 O::Service: GrpcService<Body>,
134 RetryService<O::Service>: GrpcService<Body>,
135{
136 type Service = RetryService<O::Service>;
137 fn into_service(self, channel: C) -> Self::Service {
138 let service = self.other.into_service(channel);
139 self.layer.layer(service)
140 }
141}
142
143#[derive(Clone, Debug)]
145pub struct ChannelBuilder<O = ()> {
146 endpoint: Endpoint,
147 #[cfg(feature = "tracing")]
148 trace_layer: CustomTraceLayer,
149 options: O,
150}
151
152impl From<Endpoint> for ChannelBuilder<()> {
153 fn from(endpoint: Endpoint) -> Self {
154 #[cfg(feature = "tracing")]
155 {
156 let base_url = endpoint.uri().to_string();
157 Self {
158 endpoint,
159 trace_layer: build_trace_layer(base_url, None),
160 options: (),
161 }
162 }
163
164 #[cfg(not(feature = "tracing"))]
165 return Self {
166 endpoint,
167 options: (),
168 };
169 }
170}
171
172impl ChannelBuilder<()> {
173 pub fn from_uri(uri: Uri) -> Self {
175 #[cfg(feature = "tracing")]
176 {
177 let base_url = uri.to_string();
178 Self {
179 endpoint: get_endpoint(uri),
180 trace_layer: build_trace_layer(base_url, None),
181 options: (),
182 }
183 }
184
185 #[cfg(not(feature = "tracing"))]
186 return Self {
187 endpoint: get_endpoint(uri),
188 options: (),
189 };
190 }
191}
192
193#[cfg(feature = "tracing")]
194type TargetService = CustomTraceService;
195#[cfg(not(feature = "tracing"))]
196type TargetService = Channel;
197
198impl<O> ChannelBuilder<O>
199where
200 O: IntoService<TargetService>,
201{
202 #[must_use]
204 pub fn with_timeout(mut self, timeout: Duration) -> Self {
205 self.endpoint = self.endpoint.timeout(timeout);
206 self
207 }
208
209 pub fn with_refresh_layer<T>(
211 self,
212 layer: RefreshLayer<T>,
213 ) -> ChannelBuilder<RefreshOptions<O, T>>
214 where
215 T: TokenRefresher + Clone + Send + Sync,
216 {
217 #[cfg(feature = "tracing")]
218 return ChannelBuilder {
219 endpoint: self.endpoint,
220 trace_layer: self.trace_layer,
221 options: RefreshOptions {
222 layer,
223 other: self.options,
224 },
225 };
226 #[cfg(not(feature = "tracing"))]
227 return ChannelBuilder {
228 endpoint: self.endpoint,
229 options: RefreshOptions {
230 layer,
231 other: self.options,
232 },
233 };
234 }
235
236 pub fn with_token_refresher<T>(self, refresher: T) -> ChannelBuilder<RefreshOptions<O, T>>
238 where
239 T: TokenRefresher + Clone + Send + Sync,
240 {
241 self.with_refresh_layer(RefreshLayer::with_refresher(refresher))
242 }
243
244 pub fn with_qcs_config(
246 self,
247 config: ClientConfiguration,
248 ) -> ChannelBuilder<RefreshOptions<O, ClientConfiguration>> {
249 #[cfg(feature = "tracing")]
250 {
251 let base_url = self.endpoint.uri().to_string();
252 let trace_layer = build_trace_layer(base_url, config.tracing_configuration());
253 let mut builder = self.with_token_refresher(config);
254 builder.trace_layer = trace_layer;
255 builder
256 }
257 #[cfg(not(feature = "tracing"))]
258 {
259 self.with_token_refresher(config)
260 }
261 }
262
263 pub fn with_qcs_profile(
269 self,
270 profile: Option<String>,
271 ) -> Result<ChannelBuilder<RefreshOptions<O, ClientConfiguration>>, LoadError> {
272 let config = match profile {
273 Some(profile) => ClientConfiguration::load_profile(profile)?,
274 None => ClientConfiguration::load_default()?,
275 };
276
277 Ok(self.with_qcs_config(config))
278 }
279
280 pub fn with_retry_layer(self, layer: RetryLayer) -> ChannelBuilder<RetryOptions<O>> {
282 #[cfg(feature = "tracing")]
283 return ChannelBuilder {
284 endpoint: self.endpoint,
285 trace_layer: self.trace_layer,
286 options: RetryOptions {
287 layer,
288 other: self.options,
289 },
290 };
291 #[cfg(not(feature = "tracing"))]
292 return ChannelBuilder {
293 endpoint: self.endpoint,
294 options: RetryOptions {
295 layer,
296 other: self.options,
297 },
298 };
299 }
300
301 pub fn with_retry_backoff(
303 self,
304 backoff: ExponentialBackoff,
305 ) -> ChannelBuilder<RetryOptions<O>> {
306 self.with_retry_layer(RetryLayer { backoff })
307 }
308
309 pub fn with_default_retry(self) -> ChannelBuilder<RetryOptions<O>> {
311 self.with_retry_backoff(default_backoff())
312 }
313
314 #[allow(clippy::result_large_err)]
320 pub fn build(self) -> Result<O::Service, ChannelError> {
321 let channel = get_channel_with_endpoint(&self.endpoint)?;
322 #[cfg(feature = "tracing")]
323 {
324 let traced_channel = self.trace_layer.layer(channel);
325 Ok(self.options.into_service(traced_channel))
326 }
327
328 #[cfg(not(feature = "tracing"))]
329 Ok(self.options.into_service(channel))
330 }
331}
332
333#[allow(clippy::result_large_err)]
341pub fn parse_uri(s: &str) -> Result<Uri, Error<TokenError>> {
342 s.parse().map_err(Error::from)
343}
344
345#[allow(clippy::missing_panics_doc)]
347pub fn get_endpoint(uri: Uri) -> Endpoint {
348 Channel::builder(uri)
349 .user_agent(concat!(
350 "QCS gRPC Client (Rust)/",
351 env!("CARGO_PKG_VERSION")
352 ))
353 .expect("user agent string should be valid")
354 .http2_adaptive_window(true)
355 .tls_config(ClientTlsConfig::new().with_enabled_roots())
356 .expect("tls setup should succeed")
357}
358
359pub fn get_endpoint_with_timeout(uri: Uri, timeout: Option<Duration>) -> Endpoint {
361 if let Some(duration) = timeout {
362 get_endpoint(uri).timeout(duration)
363 } else {
364 get_endpoint(uri)
365 }
366}
367
368fn get_env_uri(key: &str) -> Result<Option<Uri>, InvalidUri> {
371 std::env::var(key)
372 .or_else(|_| std::env::var(key.to_lowercase()))
373 .ok()
374 .map(Uri::try_from)
375 .transpose()
376}
377
378fn get_uri_socks_auth(uri: &Uri) -> Result<Option<Auth>, url::ParseError> {
380 let full_url = uri.to_string().parse::<Url>()?;
381 let user = full_url.username();
382 let auth = if user.is_empty() {
383 None
384 } else {
385 let pass = full_url.password().unwrap_or_default();
386 Some(Auth::new(user, pass))
387 };
388 Ok(auth)
389}
390
391#[allow(clippy::result_large_err)]
407pub fn get_channel(uri: Uri) -> Result<Channel, ChannelError> {
408 let endpoint = get_endpoint(uri);
409 get_channel_with_endpoint(&endpoint)
410}
411
412#[allow(clippy::result_large_err)]
429pub fn get_channel_with_timeout(
430 uri: Uri,
431 timeout: Option<Duration>,
432) -> Result<Channel, ChannelError> {
433 let endpoint = get_endpoint_with_timeout(uri, timeout);
434 get_channel_with_endpoint(&endpoint)
435}
436
437#[allow(
456 clippy::similar_names,
457 reason = "http(s)_proxy are similar but precise in this case"
458)]
459#[allow(clippy::result_large_err)]
460pub fn get_channel_with_endpoint(endpoint: &Endpoint) -> Result<Channel, ChannelError> {
461 let https_proxy = get_env_uri("HTTPS_PROXY")?;
462 let http_proxy = get_env_uri("HTTP_PROXY")?;
463
464 let mut connector = HttpConnector::new();
465 connector.enforce_http(false);
466
467 let connect_to = |uri: http::Uri, intercept: Intercept| {
468 let connector = connector.clone();
469 match uri.scheme_str() {
470 Some("socks5") => {
471 let socks_connector = SocksConnector {
472 auth: get_uri_socks_auth(&uri)?,
473 proxy_addr: uri,
474 connector,
475 };
476 Ok(endpoint.connect_with_connector_lazy(socks_connector))
477 }
478 Some("https" | "http") => {
479 let is_http = uri.scheme() == Some(&http::uri::Scheme::HTTP);
480 let proxy = Proxy::new(intercept, uri);
481 let mut proxy_connector = ProxyConnector::from_proxy(connector, proxy)?;
482 if is_http {
483 proxy_connector.set_tls(None);
484 }
485 Ok(endpoint.connect_with_connector_lazy(proxy_connector))
486 }
487 scheme => Err(ChannelError::UnsupportedProtocol(scheme.map(String::from))),
488 }
489 };
490
491 let channel = match (https_proxy, http_proxy) {
492 (None, None) => endpoint.connect_lazy(),
494
495 (Some(https_proxy), None) => connect_to(https_proxy, Intercept::Https)?,
497 (None, Some(http_proxy)) => connect_to(http_proxy, Intercept::Http)?,
498
499 (Some(https_proxy), Some(http_proxy)) => {
502 if https_proxy == http_proxy {
503 connect_to(https_proxy, Intercept::All)?
504 } else {
505 let accepted = [https_proxy.scheme_str(), http_proxy.scheme_str()]
506 .into_iter()
507 .all(|scheme| matches!(scheme, Some("https" | "http")));
508 if accepted {
509 let mut proxy_connector = ProxyConnector::new(connector)?;
510 proxy_connector.extend_proxies(vec![
511 Proxy::new(Intercept::Https, https_proxy),
512 Proxy::new(Intercept::Http, http_proxy),
513 ]);
514 endpoint.connect_with_connector_lazy(proxy_connector)
515 } else {
516 return Err(ChannelError::Mismatch {
517 https_proxy,
518 http_proxy,
519 });
520 }
521 }
522 }
523 };
524
525 Ok(channel)
526}
527
528#[allow(clippy::result_large_err)]
534pub fn get_wrapped_channel(
535 uri: Uri,
536) -> Result<RefreshService<Channel, ClientConfiguration>, Error<TokenError>> {
537 wrap_channel(get_channel(uri)?)
538}
539
540#[must_use]
542pub fn wrap_channel_with<C>(
543 channel: C,
544 config: ClientConfiguration,
545) -> RefreshService<C, ClientConfiguration>
546where
547 C: GrpcService<Body>,
548{
549 ServiceBuilder::new()
550 .layer(RefreshLayer::with_config(config))
551 .service(channel)
552}
553
554pub fn wrap_channel_with_token_refresher<C, T>(
558 channel: C,
559 token_refresher: T,
560) -> RefreshService<C, T>
561where
562 C: GrpcService<Body>,
563 T: TokenRefresher + Clone + Send + Sync,
564{
565 ServiceBuilder::new()
566 .layer(RefreshLayer::with_refresher(token_refresher))
567 .service(channel)
568}
569
570#[allow(clippy::result_large_err)]
576pub fn wrap_channel<C>(
577 channel: C,
578) -> Result<RefreshService<C, ClientConfiguration>, Error<TokenError>>
579where
580 C: GrpcService<Body>,
581{
582 Ok(wrap_channel_with(channel, {
583 ClientConfiguration::load_default()?
584 }))
585}
586
587#[allow(clippy::result_large_err)]
593pub fn wrap_channel_with_profile<C>(
594 channel: C,
595 profile: String,
596) -> Result<RefreshService<C, ClientConfiguration>, Error<TokenError>>
597where
598 C: GrpcService<Body>,
599{
600 Ok(wrap_channel_with(
601 channel,
602 ClientConfiguration::load_profile(profile)?,
603 ))
604}
605
606pub fn wrap_channel_with_retry<C>(channel: C) -> RetryService<C>
608where
609 C: GrpcService<Body>,
610{
611 ServiceBuilder::new()
612 .layer(RetryLayer::default())
613 .service(channel)
614}
615
616#[cfg(feature = "tracing")]
617pub fn wrap_channel_with_tracing(
619 channel: Channel,
620 base_url: String,
621 configuration: TracingConfiguration,
622) -> CustomTraceService {
623 ServiceBuilder::new()
624 .layer(build_trace_layer(base_url, Some(&configuration)))
625 .service(channel)
626}