1use std::time::Duration;
5
6use backoff::ExponentialBackoff;
7use http::{uri::InvalidUri, Uri};
8use hyper_proxy2::{Intercept, Proxy, ProxyConnector};
9use hyper_socks2::{Auth, SocksConnector};
10use hyper_util::client::legacy::connect::HttpConnector;
11use tonic::{
12 body::BoxBody,
13 client::GrpcService,
14 transport::{Channel, ClientTlsConfig, Endpoint},
15};
16use tower::{Layer, ServiceBuilder};
17use url::Url;
18
19use qcs_api_client_common::{
20 backoff::{self, default_backoff},
21 configuration::{ClientConfiguration, LoadError, TokenError, TokenRefresher},
22};
23
24#[cfg(feature = "tracing")]
25use qcs_api_client_common::tracing_configuration::TracingConfiguration;
26
27#[cfg(feature = "tracing")]
28use super::trace::{build_trace_layer, CustomTraceLayer, CustomTraceService};
29use super::{Error, RefreshLayer, RefreshService, RetryLayer, RetryService};
30
31#[derive(Debug, thiserror::Error)]
33#[non_exhaustive]
34pub enum ChannelError {
35 #[error("Failed to parse URI: {0}")]
37 InvalidUri(#[from] InvalidUri),
38 #[error("Failed to parse URL: {0}")]
40 InvalidUrl(#[from] url::ParseError),
41 #[error("Protocol is missing or not supported: {0:?}")]
43 UnsupportedProtocol(Option<String>),
44 #[error("HTTP proxy ssl verification failed: {0}")]
46 SslFailure(#[from] std::io::Error),
47 #[error("Cannot set separate https and http proxies if one of them is socks5")]
49 Mismatch {
50 https_proxy: Uri,
52 http_proxy: Uri,
54 },
55}
56
57pub trait IntoService<C: GrpcService<BoxBody>> {
59 type Service: GrpcService<BoxBody>;
61
62 fn into_service(self, channel: C) -> Self::Service;
64}
65
66impl<C> IntoService<C> for ()
67where
68 C: GrpcService<BoxBody>,
69{
70 type Service = C;
71 fn into_service(self, channel: C) -> Self::Service {
72 channel
73 }
74}
75
76#[derive(Clone, Debug)]
78pub struct RefreshOptions<O, R>
79where
80 R: TokenRefresher + Clone + Send + Sync,
81{
82 layer: RefreshLayer<R>,
83 other: O,
84}
85
86impl<T> From<T> for RefreshOptions<(), T>
87where
88 T: TokenRefresher + Clone + Send + Sync,
89{
90 fn from(refresher: T) -> Self {
91 Self {
92 layer: RefreshLayer::with_refresher(refresher),
93 other: (),
94 }
95 }
96}
97
98impl<C, T, O> IntoService<C> for RefreshOptions<O, T>
99where
100 C: GrpcService<BoxBody>,
101 O: IntoService<C>,
102 O::Service: GrpcService<BoxBody>,
103 RefreshService<O::Service, T>: GrpcService<BoxBody>,
104 T: TokenRefresher + Clone + Send + Sync + 'static,
105{
106 type Service = RefreshService<O::Service, T>;
107 fn into_service(self, channel: C) -> Self::Service {
108 let service = self.other.into_service(channel);
109 self.layer.layer(service)
110 }
111}
112
113#[derive(Clone, Debug)]
115pub struct RetryOptions<O = ()> {
116 layer: RetryLayer,
117 other: O,
118}
119
120impl From<ExponentialBackoff> for RetryOptions<()> {
121 fn from(backoff: ExponentialBackoff) -> Self {
122 Self {
123 layer: RetryLayer { backoff },
124 other: (),
125 }
126 }
127}
128
129impl<C, O> IntoService<C> for RetryOptions<O>
130where
131 C: GrpcService<BoxBody>,
132 O: IntoService<C>,
133 O::Service: GrpcService<BoxBody>,
134 RetryService<O::Service>: GrpcService<BoxBody>,
135{
136 type Service = RetryService<O::Service>;
137 fn into_service(self, channel: C) -> Self::Service {
138 let service = self.other.into_service(channel);
139 self.layer.layer(service)
140 }
141}
142
143#[derive(Clone, Debug)]
145pub struct ChannelBuilder<O = ()> {
146 endpoint: Endpoint,
147 #[cfg(feature = "tracing")]
148 trace_layer: CustomTraceLayer,
149 options: O,
150}
151
152impl From<Endpoint> for ChannelBuilder<()> {
153 fn from(endpoint: Endpoint) -> Self {
154 #[cfg(feature = "tracing")]
155 {
156 let base_url = endpoint.uri().to_string();
157 Self {
158 endpoint,
159 trace_layer: build_trace_layer(base_url, None),
160 options: (),
161 }
162 }
163
164 #[cfg(not(feature = "tracing"))]
165 return Self {
166 endpoint,
167 options: (),
168 };
169 }
170}
171
172impl ChannelBuilder<()> {
173 pub fn from_uri(uri: Uri) -> Self {
175 #[cfg(feature = "tracing")]
176 {
177 let base_url = uri.to_string();
178 Self {
179 endpoint: get_endpoint(uri),
180 trace_layer: build_trace_layer(base_url, None),
181 options: (),
182 }
183 }
184
185 #[cfg(not(feature = "tracing"))]
186 return Self {
187 endpoint: get_endpoint(uri),
188 options: (),
189 };
190 }
191}
192
193#[cfg(feature = "tracing")]
194type TargetService = CustomTraceService;
195#[cfg(not(feature = "tracing"))]
196type TargetService = Channel;
197
198impl<O> ChannelBuilder<O>
199where
200 O: IntoService<TargetService>,
201{
202 #[must_use]
204 pub fn with_timeout(mut self, timeout: Duration) -> Self {
205 self.endpoint = self.endpoint.timeout(timeout);
206 self
207 }
208
209 pub fn with_refresh_layer<T>(
211 self,
212 layer: RefreshLayer<T>,
213 ) -> ChannelBuilder<RefreshOptions<O, T>>
214 where
215 T: TokenRefresher + Clone + Send + Sync,
216 {
217 #[cfg(feature = "tracing")]
218 return ChannelBuilder {
219 endpoint: self.endpoint,
220 trace_layer: self.trace_layer,
221 options: RefreshOptions {
222 layer,
223 other: self.options,
224 },
225 };
226 #[cfg(not(feature = "tracing"))]
227 return ChannelBuilder {
228 endpoint: self.endpoint,
229 options: RefreshOptions {
230 layer,
231 other: self.options,
232 },
233 };
234 }
235
236 pub fn with_token_refresher<T>(self, refresher: T) -> ChannelBuilder<RefreshOptions<O, T>>
238 where
239 T: TokenRefresher + Clone + Send + Sync,
240 {
241 self.with_refresh_layer(RefreshLayer::with_refresher(refresher))
242 }
243
244 pub fn with_qcs_config(
246 self,
247 config: ClientConfiguration,
248 ) -> ChannelBuilder<RefreshOptions<O, ClientConfiguration>> {
249 #[cfg(feature = "tracing")]
250 {
251 let base_url = self.endpoint.uri().to_string();
252 let trace_layer = build_trace_layer(base_url, config.tracing_configuration());
253 let mut builder = self.with_token_refresher(config);
254 builder.trace_layer = trace_layer;
255 builder
256 }
257 #[cfg(not(feature = "tracing"))]
258 {
259 self.with_token_refresher(config)
260 }
261 }
262
263 pub fn with_qcs_profile(
269 self,
270 profile: Option<String>,
271 ) -> Result<ChannelBuilder<RefreshOptions<O, ClientConfiguration>>, LoadError> {
272 let config = match profile {
273 Some(profile) => ClientConfiguration::load_profile(profile)?,
274 None => ClientConfiguration::load_default()?,
275 };
276
277 Ok(self.with_qcs_config(config))
278 }
279
280 pub fn with_retry_layer(self, layer: RetryLayer) -> ChannelBuilder<RetryOptions<O>> {
282 #[cfg(feature = "tracing")]
283 return ChannelBuilder {
284 endpoint: self.endpoint,
285 trace_layer: self.trace_layer,
286 options: RetryOptions {
287 layer,
288 other: self.options,
289 },
290 };
291 #[cfg(not(feature = "tracing"))]
292 return ChannelBuilder {
293 endpoint: self.endpoint,
294 options: RetryOptions {
295 layer,
296 other: self.options,
297 },
298 };
299 }
300
301 pub fn with_retry_backoff(
303 self,
304 backoff: ExponentialBackoff,
305 ) -> ChannelBuilder<RetryOptions<O>> {
306 self.with_retry_layer(RetryLayer { backoff })
307 }
308
309 pub fn with_default_retry(self) -> ChannelBuilder<RetryOptions<O>> {
311 self.with_retry_backoff(default_backoff())
312 }
313
314 pub fn build(self) -> Result<O::Service, ChannelError> {
320 let channel = get_channel_with_endpoint(&self.endpoint)?;
321 #[cfg(feature = "tracing")]
322 {
323 let traced_channel = self.trace_layer.layer(channel);
324 Ok(self.options.into_service(traced_channel))
325 }
326
327 #[cfg(not(feature = "tracing"))]
328 Ok(self.options.into_service(channel))
329 }
330}
331
332pub fn parse_uri(s: &str) -> Result<Uri, Error<TokenError>> {
340 s.parse().map_err(Error::from)
341}
342
343#[allow(clippy::missing_panics_doc)]
345pub fn get_endpoint(uri: Uri) -> Endpoint {
346 Channel::builder(uri)
347 .user_agent(concat!(
348 "QCS gRPC Client (Rust)/",
349 env!("CARGO_PKG_VERSION")
350 ))
351 .expect("user agent string should be valid")
352 .tls_config(ClientTlsConfig::new().with_enabled_roots())
353 .expect("tls setup should succeed")
354}
355
356pub fn get_endpoint_with_timeout(uri: Uri, timeout: Option<Duration>) -> Endpoint {
358 if let Some(duration) = timeout {
359 get_endpoint(uri).timeout(duration)
360 } else {
361 get_endpoint(uri)
362 }
363}
364
365fn get_env_uri(key: &str) -> Result<Option<Uri>, InvalidUri> {
368 std::env::var(key)
369 .or_else(|_| std::env::var(key.to_lowercase()))
370 .ok()
371 .map(Uri::try_from)
372 .transpose()
373}
374
375fn get_uri_socks_auth(uri: &Uri) -> Result<Option<Auth>, url::ParseError> {
377 let full_url = uri.to_string().parse::<Url>()?;
378 let user = full_url.username();
379 let auth = if user.is_empty() {
380 None
381 } else {
382 let pass = full_url.password().unwrap_or_default();
383 Some(Auth::new(user, pass))
384 };
385 Ok(auth)
386}
387
388pub fn get_channel(uri: Uri) -> Result<Channel, ChannelError> {
404 let endpoint = get_endpoint(uri);
405 get_channel_with_endpoint(&endpoint)
406}
407
408pub fn get_channel_with_timeout(
425 uri: Uri,
426 timeout: Option<Duration>,
427) -> Result<Channel, ChannelError> {
428 let endpoint = get_endpoint_with_timeout(uri, timeout);
429 get_channel_with_endpoint(&endpoint)
430}
431
432#[allow(clippy::similar_names)] pub fn get_channel_with_endpoint(endpoint: &Endpoint) -> Result<Channel, ChannelError> {
452 let https_proxy = get_env_uri("HTTPS_PROXY")?;
453 let http_proxy = get_env_uri("HTTP_PROXY")?;
454
455 let mut connector = HttpConnector::new();
456 connector.enforce_http(false);
457
458 let connect_to = |uri: http::Uri, intercept: Intercept| {
459 let connector = connector.clone();
460 match uri.scheme_str() {
461 Some("socks5") => {
462 let socks_connector = SocksConnector {
463 auth: get_uri_socks_auth(&uri)?,
464 proxy_addr: uri,
465 connector,
466 };
467 Ok(endpoint.connect_with_connector_lazy(socks_connector))
468 }
469 Some("https" | "http") => {
470 let is_http = uri.scheme() == Some(&http::uri::Scheme::HTTP);
471 let proxy = Proxy::new(intercept, uri);
472 let mut proxy_connector = ProxyConnector::from_proxy(connector, proxy)?;
473 if is_http {
474 proxy_connector.set_tls(None);
475 }
476 Ok(endpoint.connect_with_connector_lazy(proxy_connector))
477 }
478 scheme => Err(ChannelError::UnsupportedProtocol(scheme.map(String::from))),
479 }
480 };
481
482 let channel = match (https_proxy, http_proxy) {
483 (None, None) => endpoint.connect_lazy(),
485
486 (Some(https_proxy), None) => connect_to(https_proxy, Intercept::Https)?,
488 (None, Some(http_proxy)) => connect_to(http_proxy, Intercept::Http)?,
489
490 (Some(https_proxy), Some(http_proxy)) => {
493 if https_proxy == http_proxy {
494 connect_to(https_proxy, Intercept::All)?
495 } else {
496 let accepted = [https_proxy.scheme_str(), http_proxy.scheme_str()]
497 .into_iter()
498 .all(|scheme| matches!(scheme, Some("https" | "http")));
499 if accepted {
500 let mut proxy_connector = ProxyConnector::new(connector)?;
501 proxy_connector.extend_proxies(vec![
502 Proxy::new(Intercept::Https, https_proxy),
503 Proxy::new(Intercept::Http, http_proxy),
504 ]);
505 endpoint.connect_with_connector_lazy(proxy_connector)
506 } else {
507 return Err(ChannelError::Mismatch {
508 https_proxy,
509 http_proxy,
510 });
511 }
512 }
513 }
514 };
515
516 Ok(channel)
517}
518
519pub fn get_wrapped_channel(
525 uri: Uri,
526) -> Result<RefreshService<Channel, ClientConfiguration>, Error<TokenError>> {
527 wrap_channel(get_channel(uri)?)
528}
529
530#[must_use]
532pub fn wrap_channel_with<C>(
533 channel: C,
534 config: ClientConfiguration,
535) -> RefreshService<C, ClientConfiguration>
536where
537 C: GrpcService<BoxBody>,
538{
539 ServiceBuilder::new()
540 .layer(RefreshLayer::with_config(config))
541 .service(channel)
542}
543
544pub fn wrap_channel_with_token_refresher<C, T>(
548 channel: C,
549 token_refresher: T,
550) -> RefreshService<C, T>
551where
552 C: GrpcService<BoxBody>,
553 T: TokenRefresher + Clone + Send + Sync,
554{
555 ServiceBuilder::new()
556 .layer(RefreshLayer::with_refresher(token_refresher))
557 .service(channel)
558}
559
560pub fn wrap_channel<C>(
566 channel: C,
567) -> Result<RefreshService<C, ClientConfiguration>, Error<TokenError>>
568where
569 C: GrpcService<BoxBody>,
570{
571 Ok(wrap_channel_with(channel, {
572 ClientConfiguration::load_default()?
573 }))
574}
575
576pub fn wrap_channel_with_profile<C>(
582 channel: C,
583 profile: String,
584) -> Result<RefreshService<C, ClientConfiguration>, Error<TokenError>>
585where
586 C: GrpcService<BoxBody>,
587{
588 Ok(wrap_channel_with(
589 channel,
590 ClientConfiguration::load_profile(profile)?,
591 ))
592}
593
594pub fn wrap_channel_with_retry<C>(channel: C) -> RetryService<C>
596where
597 C: GrpcService<BoxBody>,
598{
599 ServiceBuilder::new()
600 .layer(RetryLayer::default())
601 .service(channel)
602}
603
604#[cfg(feature = "tracing")]
605pub fn wrap_channel_with_tracing(
607 channel: Channel,
608 base_url: String,
609 configuration: TracingConfiguration,
610) -> CustomTraceService {
611 ServiceBuilder::new()
612 .layer(build_trace_layer(base_url, Some(&configuration)))
613 .service(channel)
614}