qcs_api_client_grpc/tonic/
channel.rs

1//! Utilities for creating and configuring gRPC channels.
2//!
3//! The [`ChannelBuilder`] is the primary entry point for configuring a gRPC channel.
4use std::time::Duration;
5
6use backoff::ExponentialBackoff;
7use http::{uri::InvalidUri, Uri};
8use hyper_proxy2::{Intercept, Proxy, ProxyConnector};
9use hyper_socks2::{Auth, SocksConnector};
10use hyper_util::client::legacy::connect::HttpConnector;
11use tonic::{
12    body::Body,
13    client::GrpcService,
14    transport::{Channel, ClientTlsConfig, Endpoint},
15};
16use tower::{Layer, ServiceBuilder};
17use url::Url;
18
19use qcs_api_client_common::{
20    backoff::{self, default_backoff},
21    configuration::{tokens::TokenRefresher, ClientConfiguration, LoadError, TokenError},
22};
23
24#[cfg(feature = "tracing")]
25use qcs_api_client_common::tracing_configuration::TracingConfiguration;
26
27#[cfg(feature = "tracing")]
28use super::trace::{build_trace_layer, CustomTraceLayer, CustomTraceService};
29use super::{Error, RefreshLayer, RefreshService, RetryLayer, RetryService};
30
31/// Errors that may occur when configuring a channel connection
32#[derive(Debug, thiserror::Error)]
33#[non_exhaustive]
34pub enum ChannelError {
35    /// Failed to parse URI.
36    #[error("Failed to parse URI: {0}")]
37    InvalidUri(#[from] InvalidUri),
38    /// Failed to parse URL. Used to derive user/pass.
39    #[error("Failed to parse URL: {0}")]
40    InvalidUrl(#[from] url::ParseError),
41    /// Unsupported proxy protocol.
42    #[error("Protocol is missing or not supported: {0:?}")]
43    UnsupportedProtocol(Option<String>),
44    /// Proxy ssl verification failed
45    #[error("HTTP proxy ssl verification failed: {0}")]
46    SslFailure(#[from] std::io::Error),
47    /// Proxy targets do not agree
48    #[error("Cannot set separate https and http proxies if one of them is socks5")]
49    Mismatch {
50        /// The URI of the HTTPS proxy.
51        https_proxy: Uri,
52        /// The URI of the HTTP proxy.
53        http_proxy: Uri,
54    },
55}
56
57/// Defines a logic for turning some object into a [`GrpcService`].
58pub trait IntoService<C: GrpcService<Body>> {
59    /// The service type that will be returned.
60    type Service: GrpcService<Body>;
61
62    /// Convert the object into a service.
63    fn into_service(self, channel: C) -> Self::Service;
64}
65
66impl<C> IntoService<C> for ()
67where
68    C: GrpcService<Body>,
69{
70    type Service = C;
71    fn into_service(self, channel: C) -> Self::Service {
72        channel
73    }
74}
75
76/// Options for configuring QCS authentication.
77#[derive(Clone, Debug)]
78pub struct RefreshOptions<O, R>
79where
80    R: TokenRefresher + Clone + Send + Sync,
81{
82    layer: RefreshLayer<R>,
83    other: O,
84}
85
86impl<T> From<T> for RefreshOptions<(), T>
87where
88    T: TokenRefresher + Clone + Send + Sync,
89{
90    fn from(refresher: T) -> Self {
91        Self {
92            layer: RefreshLayer::with_refresher(refresher),
93            other: (),
94        }
95    }
96}
97
98impl<C, T, O> IntoService<C> for RefreshOptions<O, T>
99where
100    C: GrpcService<Body>,
101    O: IntoService<C>,
102    O::Service: GrpcService<Body>,
103    RefreshService<O::Service, T>: GrpcService<Body>,
104    T: TokenRefresher + Clone + Send + Sync + 'static,
105{
106    type Service = RefreshService<O::Service, T>;
107    fn into_service(self, channel: C) -> Self::Service {
108        let service = self.other.into_service(channel);
109        self.layer.layer(service)
110    }
111}
112
113/// Options for configuring retry logic.
114#[derive(Clone, Debug)]
115pub struct RetryOptions<O = ()> {
116    layer: RetryLayer,
117    other: O,
118}
119
120impl From<ExponentialBackoff> for RetryOptions<()> {
121    fn from(backoff: ExponentialBackoff) -> Self {
122        Self {
123            layer: RetryLayer { backoff },
124            other: (),
125        }
126    }
127}
128
129impl<C, O> IntoService<C> for RetryOptions<O>
130where
131    C: GrpcService<Body>,
132    O: IntoService<C>,
133    O::Service: GrpcService<Body>,
134    RetryService<O::Service>: GrpcService<Body>,
135{
136    type Service = RetryService<O::Service>;
137    fn into_service(self, channel: C) -> Self::Service {
138        let service = self.other.into_service(channel);
139        self.layer.layer(service)
140    }
141}
142
143/// Builder for configuring a [`Channel`].
144#[derive(Clone, Debug)]
145pub struct ChannelBuilder<O = ()> {
146    endpoint: Endpoint,
147    #[cfg(feature = "tracing")]
148    trace_layer: CustomTraceLayer,
149    options: O,
150}
151
152impl From<Endpoint> for ChannelBuilder<()> {
153    fn from(endpoint: Endpoint) -> Self {
154        #[cfg(feature = "tracing")]
155        {
156            let base_url = endpoint.uri().to_string();
157            Self {
158                endpoint,
159                trace_layer: build_trace_layer(base_url, None),
160                options: (),
161            }
162        }
163
164        #[cfg(not(feature = "tracing"))]
165        return Self {
166            endpoint,
167            options: (),
168        };
169    }
170}
171
172impl ChannelBuilder<()> {
173    /// Create a [`ChannelBuilder`] using the given [`Uri`]
174    pub fn from_uri(uri: Uri) -> Self {
175        #[cfg(feature = "tracing")]
176        {
177            let base_url = uri.to_string();
178            Self {
179                endpoint: get_endpoint(uri),
180                trace_layer: build_trace_layer(base_url, None),
181                options: (),
182            }
183        }
184
185        #[cfg(not(feature = "tracing"))]
186        return Self {
187            endpoint: get_endpoint(uri),
188            options: (),
189        };
190    }
191}
192
193#[cfg(feature = "tracing")]
194type TargetService = CustomTraceService;
195#[cfg(not(feature = "tracing"))]
196type TargetService = Channel;
197
198impl<O> ChannelBuilder<O>
199where
200    O: IntoService<TargetService>,
201{
202    /// Wrap the channel with a timeout.
203    #[must_use]
204    pub fn with_timeout(mut self, timeout: Duration) -> Self {
205        self.endpoint = self.endpoint.timeout(timeout);
206        self
207    }
208
209    /// Wrap the channel with the given [`RefreshLayer`].
210    pub fn with_refresh_layer<T>(
211        self,
212        layer: RefreshLayer<T>,
213    ) -> ChannelBuilder<RefreshOptions<O, T>>
214    where
215        T: TokenRefresher + Clone + Send + Sync,
216    {
217        #[cfg(feature = "tracing")]
218        return ChannelBuilder {
219            endpoint: self.endpoint,
220            trace_layer: self.trace_layer,
221            options: RefreshOptions {
222                layer,
223                other: self.options,
224            },
225        };
226        #[cfg(not(feature = "tracing"))]
227        return ChannelBuilder {
228            endpoint: self.endpoint,
229            options: RefreshOptions {
230                layer,
231                other: self.options,
232            },
233        };
234    }
235
236    /// Wrap the channel with QCS authentication using the given [`TokenRefresher`].
237    pub fn with_token_refresher<T>(self, refresher: T) -> ChannelBuilder<RefreshOptions<O, T>>
238    where
239        T: TokenRefresher + Clone + Send + Sync,
240    {
241        self.with_refresh_layer(RefreshLayer::with_refresher(refresher))
242    }
243
244    /// Wrap the channel with QCS authentication for the given [`ClientConfiguration`].
245    pub fn with_qcs_config(
246        self,
247        config: ClientConfiguration,
248    ) -> ChannelBuilder<RefreshOptions<O, ClientConfiguration>> {
249        #[cfg(feature = "tracing")]
250        {
251            let base_url = self.endpoint.uri().to_string();
252            let trace_layer = build_trace_layer(base_url, config.tracing_configuration());
253            let mut builder = self.with_token_refresher(config);
254            builder.trace_layer = trace_layer;
255            builder
256        }
257        #[cfg(not(feature = "tracing"))]
258        {
259            self.with_token_refresher(config)
260        }
261    }
262
263    /// Wrap the channel with QCS authentication for the given QCS profile.
264    ///
265    /// # Errors
266    ///
267    /// Returns a [`LoadError`] if the profile cannot be loaded.
268    pub fn with_qcs_profile(
269        self,
270        profile: Option<String>,
271    ) -> Result<ChannelBuilder<RefreshOptions<O, ClientConfiguration>>, LoadError> {
272        let config = match profile {
273            Some(profile) => ClientConfiguration::load_profile(profile)?,
274            None => ClientConfiguration::load_default()?,
275        };
276
277        Ok(self.with_qcs_config(config))
278    }
279
280    /// Wrap the channel with the given [`RetryLayer`].
281    pub fn with_retry_layer(self, layer: RetryLayer) -> ChannelBuilder<RetryOptions<O>> {
282        #[cfg(feature = "tracing")]
283        return ChannelBuilder {
284            endpoint: self.endpoint,
285            trace_layer: self.trace_layer,
286            options: RetryOptions {
287                layer,
288                other: self.options,
289            },
290        };
291        #[cfg(not(feature = "tracing"))]
292        return ChannelBuilder {
293            endpoint: self.endpoint,
294            options: RetryOptions {
295                layer,
296                other: self.options,
297            },
298        };
299    }
300
301    /// Wrap the channel with the given [`ExponentialBackoff`] configuration.
302    pub fn with_retry_backoff(
303        self,
304        backoff: ExponentialBackoff,
305    ) -> ChannelBuilder<RetryOptions<O>> {
306        self.with_retry_layer(RetryLayer { backoff })
307    }
308
309    /// Wrap the channel with the default retry logic. See [`default_backoff`].
310    pub fn with_default_retry(self) -> ChannelBuilder<RetryOptions<O>> {
311        self.with_retry_backoff(default_backoff())
312    }
313
314    /// Build the [`Channel`]
315    ///
316    /// # Errors
317    ///
318    /// Returns a [`ChannelError`] if the service cannot be built.
319    #[allow(clippy::result_large_err)]
320    pub fn build(self) -> Result<O::Service, ChannelError> {
321        let channel = get_channel_with_endpoint(&self.endpoint)?;
322        #[cfg(feature = "tracing")]
323        {
324            let traced_channel = self.trace_layer.layer(channel);
325            Ok(self.options.into_service(traced_channel))
326        }
327
328        #[cfg(not(feature = "tracing"))]
329        Ok(self.options.into_service(channel))
330    }
331}
332
333/// Parse a string as a URI.
334///
335/// This serves as a helper to avoid consumers needing to create a new error just to include this.
336///
337/// # Errors
338///
339/// [`Error::InvalidUri`] if the string is an invalid URI.
340#[allow(clippy::result_large_err)]
341pub fn parse_uri(s: &str) -> Result<Uri, Error<TokenError>> {
342    s.parse().map_err(Error::from)
343}
344
345/// Get an [`Endpoint`] for the given [`Uri`]
346#[allow(clippy::missing_panics_doc)]
347pub fn get_endpoint(uri: Uri) -> Endpoint {
348    Channel::builder(uri)
349        .user_agent(concat!(
350            "QCS gRPC Client (Rust)/",
351            env!("CARGO_PKG_VERSION")
352        ))
353        .expect("user agent string should be valid")
354        .tls_config(ClientTlsConfig::new().with_enabled_roots())
355        .expect("tls setup should succeed")
356}
357
358/// Get an [`Endpoint`] for the given [`Uri`] and timeout.
359pub fn get_endpoint_with_timeout(uri: Uri, timeout: Option<Duration>) -> Endpoint {
360    if let Some(duration) = timeout {
361        get_endpoint(uri).timeout(duration)
362    } else {
363        get_endpoint(uri)
364    }
365}
366
367/// Fetch the env var named for `key` and parse as a `Uri`.
368/// Tries the original casing, then the full lowercasing of `key`.
369fn get_env_uri(key: &str) -> Result<Option<Uri>, InvalidUri> {
370    std::env::var(key)
371        .or_else(|_| std::env::var(key.to_lowercase()))
372        .ok()
373        .map(Uri::try_from)
374        .transpose()
375}
376
377/// Parse the authentication from `uri` into proxy `Auth`, if present.
378fn get_uri_socks_auth(uri: &Uri) -> Result<Option<Auth>, url::ParseError> {
379    let full_url = uri.to_string().parse::<Url>()?;
380    let user = full_url.username();
381    let auth = if user.is_empty() {
382        None
383    } else {
384        let pass = full_url.password().unwrap_or_default();
385        Some(Auth::new(user, pass))
386    };
387    Ok(auth)
388}
389
390/// Get a [`Channel`] to the given [`Uri`].
391/// Sets up things like user agent without setting up QCS credentials.
392///
393/// This channel will be configured to route requests through proxies defined by
394/// `HTTPS_PROXY` and/or `HTTP_PROXY` environment variables, if they are defined.
395/// The variable names can be all-uppercase or all-lowercase, but the all-uppercase
396/// variants will take precedence. Supported proxy schemes are `http`, `https`, and `socks5`.
397///
398/// Proxy configuration caveats:
399/// - If both variables are defined, neither can be a `socks5` proxy, unless they are both the same value.
400/// - If only one variable is defined, and it is a `socks5` proxy, *all* traffic will be routed through it.
401///
402/// # Errors
403///
404/// See [`ChannelError`].
405#[allow(clippy::result_large_err)]
406pub fn get_channel(uri: Uri) -> Result<Channel, ChannelError> {
407    let endpoint = get_endpoint(uri);
408    get_channel_with_endpoint(&endpoint)
409}
410
411/// Get a [`Channel`] to the given [`Uri`], with an optional timeout. If set to [`None`], no timeout is
412/// used.
413/// Sets up things like user agent without setting up QCS credentials.
414///
415/// This channel will be configured to route requests through proxies defined by
416/// `HTTPS_PROXY` and/or `HTTP_PROXY` environment variables, if they are defined.
417/// The variable names can be all-uppercase or all-lowercase, but the all-uppercase
418/// variants will take precedence. Supported proxy schemes are `http`, `https`, and `socks5`.
419///
420/// Proxy configuration caveats:
421/// - If both variables are defined, neither can be a `socks5` proxy, unless they are both the same value.
422/// - If only one variable is defined, and it is a `socks5` proxy, *all* traffic will be routed through it.
423///
424/// # Errors
425///
426/// See [`ChannelError`].
427#[allow(clippy::result_large_err)]
428pub fn get_channel_with_timeout(
429    uri: Uri,
430    timeout: Option<Duration>,
431) -> Result<Channel, ChannelError> {
432    let endpoint = get_endpoint_with_timeout(uri, timeout);
433    get_channel_with_endpoint(&endpoint)
434}
435
436/// Get a [`Channel`] to the given [`Endpoint`]. Useful if [`get_channel`] or
437/// [`get_channel_with_timeout`] don't provide the configurability you need.
438///
439/// Use [`get_endpoint`] or [`get_endpoint_with_timeout`] to get a starting
440/// [`Endpoint`].
441///
442/// This channel will be configured to route requests through proxies defined by
443/// `HTTPS_PROXY` and/or `HTTP_PROXY` environment variables, if they are defined.
444/// The variable names can be all-uppercase or all-lowercase, but the all-uppercase
445/// variants will take precedence. Supported proxy schemes are `http`, `https`, and `socks5`.
446///
447/// Proxy configuration caveats:
448/// - If both variables are defined, neither can be a `socks5` proxy, unless they are both the same value.
449/// - If only one variable is defined, and it is a `socks5` proxy, *all* traffic will be routed through it.
450///
451/// # Errors
452///
453/// Returns a [`ChannelError`] if the channel cannot be constructed.
454#[allow(
455    clippy::similar_names,
456    reason = "http(s)_proxy are similar but precise in this case"
457)]
458#[allow(clippy::result_large_err)]
459pub fn get_channel_with_endpoint(endpoint: &Endpoint) -> Result<Channel, ChannelError> {
460    let https_proxy = get_env_uri("HTTPS_PROXY")?;
461    let http_proxy = get_env_uri("HTTP_PROXY")?;
462
463    let mut connector = HttpConnector::new();
464    connector.enforce_http(false);
465
466    let connect_to = |uri: http::Uri, intercept: Intercept| {
467        let connector = connector.clone();
468        match uri.scheme_str() {
469            Some("socks5") => {
470                let socks_connector = SocksConnector {
471                    auth: get_uri_socks_auth(&uri)?,
472                    proxy_addr: uri,
473                    connector,
474                };
475                Ok(endpoint.connect_with_connector_lazy(socks_connector))
476            }
477            Some("https" | "http") => {
478                let is_http = uri.scheme() == Some(&http::uri::Scheme::HTTP);
479                let proxy = Proxy::new(intercept, uri);
480                let mut proxy_connector = ProxyConnector::from_proxy(connector, proxy)?;
481                if is_http {
482                    proxy_connector.set_tls(None);
483                }
484                Ok(endpoint.connect_with_connector_lazy(proxy_connector))
485            }
486            scheme => Err(ChannelError::UnsupportedProtocol(scheme.map(String::from))),
487        }
488    };
489
490    let channel = match (https_proxy, http_proxy) {
491        // no proxies, default behavior
492        (None, None) => endpoint.connect_lazy(),
493
494        // either proxy may use https/http, or socks.
495        (Some(https_proxy), None) => connect_to(https_proxy, Intercept::Https)?,
496        (None, Some(http_proxy)) => connect_to(http_proxy, Intercept::Http)?,
497
498        // both proxies are set. If they are the same, they can be socks5. If there are different, they
499        // must both be `https?` URIs in order to use the same `ProxyConnector`.
500        (Some(https_proxy), Some(http_proxy)) => {
501            if https_proxy == http_proxy {
502                connect_to(https_proxy, Intercept::All)?
503            } else {
504                let accepted = [https_proxy.scheme_str(), http_proxy.scheme_str()]
505                    .into_iter()
506                    .all(|scheme| matches!(scheme, Some("https" | "http")));
507                if accepted {
508                    let mut proxy_connector = ProxyConnector::new(connector)?;
509                    proxy_connector.extend_proxies(vec![
510                        Proxy::new(Intercept::Https, https_proxy),
511                        Proxy::new(Intercept::Http, http_proxy),
512                    ]);
513                    endpoint.connect_with_connector_lazy(proxy_connector)
514                } else {
515                    return Err(ChannelError::Mismatch {
516                        https_proxy,
517                        http_proxy,
518                    });
519                }
520            }
521        }
522    };
523
524    Ok(channel)
525}
526
527/// Get a [`Channel`] to the given [`Uri`] with QCS authentication set up already.
528///
529/// # Errors
530///
531/// See [`Error`]
532#[allow(clippy::result_large_err)]
533pub fn get_wrapped_channel(
534    uri: Uri,
535) -> Result<RefreshService<Channel, ClientConfiguration>, Error<TokenError>> {
536    wrap_channel(get_channel(uri)?)
537}
538
539/// Set up the given `channel` with QCS authentication.
540#[must_use]
541pub fn wrap_channel_with<C>(
542    channel: C,
543    config: ClientConfiguration,
544) -> RefreshService<C, ClientConfiguration>
545where
546    C: GrpcService<Body>,
547{
548    ServiceBuilder::new()
549        .layer(RefreshLayer::with_config(config))
550        .service(channel)
551}
552
553/// Set up the given `channel` which will automatically
554/// attempt to refresh its access token when a request fails
555/// do to an expired token
556pub fn wrap_channel_with_token_refresher<C, T>(
557    channel: C,
558    token_refresher: T,
559) -> RefreshService<C, T>
560where
561    C: GrpcService<Body>,
562    T: TokenRefresher + Clone + Send + Sync,
563{
564    ServiceBuilder::new()
565        .layer(RefreshLayer::with_refresher(token_refresher))
566        .service(channel)
567}
568
569/// Set up the given `channel` with QCS authentication.
570///
571/// # Errors
572///
573/// See [`Error`]
574#[allow(clippy::result_large_err)]
575pub fn wrap_channel<C>(
576    channel: C,
577) -> Result<RefreshService<C, ClientConfiguration>, Error<TokenError>>
578where
579    C: GrpcService<Body>,
580{
581    Ok(wrap_channel_with(channel, {
582        ClientConfiguration::load_default()?
583    }))
584}
585
586/// Set up the given `channel` with QCS authentication.
587///
588/// # Errors
589///
590/// See [`Error`]
591#[allow(clippy::result_large_err)]
592pub fn wrap_channel_with_profile<C>(
593    channel: C,
594    profile: String,
595) -> Result<RefreshService<C, ClientConfiguration>, Error<TokenError>>
596where
597    C: GrpcService<Body>,
598{
599    Ok(wrap_channel_with(
600        channel,
601        ClientConfiguration::load_profile(profile)?,
602    ))
603}
604
605/// Add exponential backoff retry logic to the `channel`.
606pub fn wrap_channel_with_retry<C>(channel: C) -> RetryService<C>
607where
608    C: GrpcService<Body>,
609{
610    ServiceBuilder::new()
611        .layer(RetryLayer::default())
612        .service(channel)
613}
614
615#[cfg(feature = "tracing")]
616/// Add a tracing layer with OpenTelemetry semantics to the `channel`.
617pub fn wrap_channel_with_tracing(
618    channel: Channel,
619    base_url: String,
620    configuration: TracingConfiguration,
621) -> CustomTraceService {
622    ServiceBuilder::new()
623        .layer(build_trace_layer(base_url, Some(&configuration)))
624        .service(channel)
625}