qcs_api_client_grpc/tonic/
channel.rs

1//! Utilities for creating and configuring gRPC channels.
2//!
3//! The [`ChannelBuilder`] is the primary entry point for configuring a gRPC channel.
4use std::time::Duration;
5
6use backoff::ExponentialBackoff;
7use http::{uri::InvalidUri, Uri};
8use hyper_proxy2::{Intercept, Proxy, ProxyConnector};
9use hyper_socks2::{Auth, SocksConnector};
10use hyper_util::client::legacy::connect::HttpConnector;
11use tonic::{
12    body::BoxBody,
13    client::GrpcService,
14    transport::{Channel, ClientTlsConfig, Endpoint},
15};
16use tower::{Layer, ServiceBuilder};
17use url::Url;
18
19use qcs_api_client_common::{
20    backoff::{self, default_backoff},
21    configuration::{ClientConfiguration, LoadError, TokenError, TokenRefresher},
22};
23
24#[cfg(feature = "tracing")]
25use qcs_api_client_common::tracing_configuration::TracingConfiguration;
26
27#[cfg(feature = "tracing")]
28use super::trace::{build_trace_layer, CustomTraceLayer, CustomTraceService};
29use super::{Error, RefreshLayer, RefreshService, RetryLayer, RetryService};
30
31/// Errors that may occur when configuring a channel connection
32#[derive(Debug, thiserror::Error)]
33#[non_exhaustive]
34pub enum ChannelError {
35    /// Failed to parse URI.
36    #[error("Failed to parse URI: {0}")]
37    InvalidUri(#[from] InvalidUri),
38    /// Failed to parse URL. Used to derive user/pass.
39    #[error("Failed to parse URL: {0}")]
40    InvalidUrl(#[from] url::ParseError),
41    /// Unsupported proxy protocol.
42    #[error("Protocol is missing or not supported: {0:?}")]
43    UnsupportedProtocol(Option<String>),
44    /// Proxy ssl verification failed
45    #[error("HTTP proxy ssl verification failed: {0}")]
46    SslFailure(#[from] std::io::Error),
47    /// Proxy targets do not agree
48    #[error("Cannot set separate https and http proxies if one of them is socks5")]
49    Mismatch {
50        /// The URI of the HTTPS proxy.
51        https_proxy: Uri,
52        /// The URI of the HTTP proxy.
53        http_proxy: Uri,
54    },
55}
56
57/// Defines a logic for turning some object into a [`GrpcService`].
58pub trait IntoService<C: GrpcService<BoxBody>> {
59    /// The service type that will be returned.
60    type Service: GrpcService<BoxBody>;
61
62    /// Convert the object into a service.
63    fn into_service(self, channel: C) -> Self::Service;
64}
65
66impl<C> IntoService<C> for ()
67where
68    C: GrpcService<BoxBody>,
69{
70    type Service = C;
71    fn into_service(self, channel: C) -> Self::Service {
72        channel
73    }
74}
75
76/// Options for configuring QCS authentication.
77#[derive(Clone, Debug)]
78pub struct RefreshOptions<O, R>
79where
80    R: TokenRefresher + Clone + Send + Sync,
81{
82    layer: RefreshLayer<R>,
83    other: O,
84}
85
86impl<T> From<T> for RefreshOptions<(), T>
87where
88    T: TokenRefresher + Clone + Send + Sync,
89{
90    fn from(refresher: T) -> Self {
91        Self {
92            layer: RefreshLayer::with_refresher(refresher),
93            other: (),
94        }
95    }
96}
97
98impl<C, T, O> IntoService<C> for RefreshOptions<O, T>
99where
100    C: GrpcService<BoxBody>,
101    O: IntoService<C>,
102    O::Service: GrpcService<BoxBody>,
103    RefreshService<O::Service, T>: GrpcService<BoxBody>,
104    T: TokenRefresher + Clone + Send + Sync + 'static,
105{
106    type Service = RefreshService<O::Service, T>;
107    fn into_service(self, channel: C) -> Self::Service {
108        let service = self.other.into_service(channel);
109        self.layer.layer(service)
110    }
111}
112
113/// Options for configuring retry logic.
114#[derive(Clone, Debug)]
115pub struct RetryOptions<O = ()> {
116    layer: RetryLayer,
117    other: O,
118}
119
120impl From<ExponentialBackoff> for RetryOptions<()> {
121    fn from(backoff: ExponentialBackoff) -> Self {
122        Self {
123            layer: RetryLayer { backoff },
124            other: (),
125        }
126    }
127}
128
129impl<C, O> IntoService<C> for RetryOptions<O>
130where
131    C: GrpcService<BoxBody>,
132    O: IntoService<C>,
133    O::Service: GrpcService<BoxBody>,
134    RetryService<O::Service>: GrpcService<BoxBody>,
135{
136    type Service = RetryService<O::Service>;
137    fn into_service(self, channel: C) -> Self::Service {
138        let service = self.other.into_service(channel);
139        self.layer.layer(service)
140    }
141}
142
143/// Builder for configuring a [`Channel`].
144#[derive(Clone, Debug)]
145pub struct ChannelBuilder<O = ()> {
146    endpoint: Endpoint,
147    #[cfg(feature = "tracing")]
148    trace_layer: CustomTraceLayer,
149    options: O,
150}
151
152impl From<Endpoint> for ChannelBuilder<()> {
153    fn from(endpoint: Endpoint) -> Self {
154        #[cfg(feature = "tracing")]
155        {
156            let base_url = endpoint.uri().to_string();
157            Self {
158                endpoint,
159                trace_layer: build_trace_layer(base_url, None),
160                options: (),
161            }
162        }
163
164        #[cfg(not(feature = "tracing"))]
165        return Self {
166            endpoint,
167            options: (),
168        };
169    }
170}
171
172impl ChannelBuilder<()> {
173    /// Create a [`ChannelBuilder`] using the given [`Uri`]
174    pub fn from_uri(uri: Uri) -> Self {
175        #[cfg(feature = "tracing")]
176        {
177            let base_url = uri.to_string();
178            Self {
179                endpoint: get_endpoint(uri),
180                trace_layer: build_trace_layer(base_url, None),
181                options: (),
182            }
183        }
184
185        #[cfg(not(feature = "tracing"))]
186        return Self {
187            endpoint: get_endpoint(uri),
188            options: (),
189        };
190    }
191}
192
193#[cfg(feature = "tracing")]
194type TargetService = CustomTraceService;
195#[cfg(not(feature = "tracing"))]
196type TargetService = Channel;
197
198impl<O> ChannelBuilder<O>
199where
200    O: IntoService<TargetService>,
201{
202    /// Wrap the channel with a timeout.
203    #[must_use]
204    pub fn with_timeout(mut self, timeout: Duration) -> Self {
205        self.endpoint = self.endpoint.timeout(timeout);
206        self
207    }
208
209    /// Wrap the channel with the given [`RefreshLayer`].
210    pub fn with_refresh_layer<T>(
211        self,
212        layer: RefreshLayer<T>,
213    ) -> ChannelBuilder<RefreshOptions<O, T>>
214    where
215        T: TokenRefresher + Clone + Send + Sync,
216    {
217        #[cfg(feature = "tracing")]
218        return ChannelBuilder {
219            endpoint: self.endpoint,
220            trace_layer: self.trace_layer,
221            options: RefreshOptions {
222                layer,
223                other: self.options,
224            },
225        };
226        #[cfg(not(feature = "tracing"))]
227        return ChannelBuilder {
228            endpoint: self.endpoint,
229            options: RefreshOptions {
230                layer,
231                other: self.options,
232            },
233        };
234    }
235
236    /// Wrap the channel with QCS authentication using the given [`TokenRefresher`].
237    pub fn with_token_refresher<T>(self, refresher: T) -> ChannelBuilder<RefreshOptions<O, T>>
238    where
239        T: TokenRefresher + Clone + Send + Sync,
240    {
241        self.with_refresh_layer(RefreshLayer::with_refresher(refresher))
242    }
243
244    /// Wrap the channel with QCS authentication for the given [`ClientConfiguration`].
245    pub fn with_qcs_config(
246        self,
247        config: ClientConfiguration,
248    ) -> ChannelBuilder<RefreshOptions<O, ClientConfiguration>> {
249        #[cfg(feature = "tracing")]
250        {
251            let base_url = self.endpoint.uri().to_string();
252            let trace_layer = build_trace_layer(base_url, config.tracing_configuration());
253            let mut builder = self.with_token_refresher(config);
254            builder.trace_layer = trace_layer;
255            builder
256        }
257        #[cfg(not(feature = "tracing"))]
258        {
259            self.with_token_refresher(config)
260        }
261    }
262
263    /// Wrap the channel with QCS authentication for the given QCS profile.
264    ///
265    /// # Errors
266    ///
267    /// Returns a [`LoadError`] if the profile cannot be loaded.
268    pub fn with_qcs_profile(
269        self,
270        profile: Option<String>,
271    ) -> Result<ChannelBuilder<RefreshOptions<O, ClientConfiguration>>, LoadError> {
272        let config = match profile {
273            Some(profile) => ClientConfiguration::load_profile(profile)?,
274            None => ClientConfiguration::load_default()?,
275        };
276
277        Ok(self.with_qcs_config(config))
278    }
279
280    /// Wrap the channel with the given [`RetryLayer`].
281    pub fn with_retry_layer(self, layer: RetryLayer) -> ChannelBuilder<RetryOptions<O>> {
282        #[cfg(feature = "tracing")]
283        return ChannelBuilder {
284            endpoint: self.endpoint,
285            trace_layer: self.trace_layer,
286            options: RetryOptions {
287                layer,
288                other: self.options,
289            },
290        };
291        #[cfg(not(feature = "tracing"))]
292        return ChannelBuilder {
293            endpoint: self.endpoint,
294            options: RetryOptions {
295                layer,
296                other: self.options,
297            },
298        };
299    }
300
301    /// Wrap the channel with the given [`ExponentialBackoff`] configuration.
302    pub fn with_retry_backoff(
303        self,
304        backoff: ExponentialBackoff,
305    ) -> ChannelBuilder<RetryOptions<O>> {
306        self.with_retry_layer(RetryLayer { backoff })
307    }
308
309    /// Wrap the channel with the default retry logic. See [`default_backoff`].
310    pub fn with_default_retry(self) -> ChannelBuilder<RetryOptions<O>> {
311        self.with_retry_backoff(default_backoff())
312    }
313
314    /// Build the [`Channel`]
315    ///
316    /// # Errors
317    ///
318    /// Returns a [`ChannelError`] if the service cannot be built.
319    pub fn build(self) -> Result<O::Service, ChannelError> {
320        let channel = get_channel_with_endpoint(&self.endpoint)?;
321        #[cfg(feature = "tracing")]
322        {
323            let traced_channel = self.trace_layer.layer(channel);
324            Ok(self.options.into_service(traced_channel))
325        }
326
327        #[cfg(not(feature = "tracing"))]
328        Ok(self.options.into_service(channel))
329    }
330}
331
332/// Parse a string as a URI.
333///
334/// This serves as a helper to avoid consumers needing to create a new error just to include this.
335///
336/// # Errors
337///
338/// [`Error::InvalidUri`] if the string is an invalid URI.
339pub fn parse_uri(s: &str) -> Result<Uri, Error<TokenError>> {
340    s.parse().map_err(Error::from)
341}
342
343/// Get an [`Endpoint`] for the given [`Uri`]
344#[allow(clippy::missing_panics_doc)]
345pub fn get_endpoint(uri: Uri) -> Endpoint {
346    Channel::builder(uri)
347        .user_agent(concat!(
348            "QCS gRPC Client (Rust)/",
349            env!("CARGO_PKG_VERSION")
350        ))
351        .expect("user agent string should be valid")
352        .tls_config(ClientTlsConfig::new().with_enabled_roots())
353        .expect("tls setup should succeed")
354}
355
356/// Get an [`Endpoint`] for the given [`Uri`] and timeout.
357pub fn get_endpoint_with_timeout(uri: Uri, timeout: Option<Duration>) -> Endpoint {
358    if let Some(duration) = timeout {
359        get_endpoint(uri).timeout(duration)
360    } else {
361        get_endpoint(uri)
362    }
363}
364
365/// Fetch the env var named for `key` and parse as a `Uri`.
366/// Tries the original casing, then the full lowercasing of `key`.
367fn get_env_uri(key: &str) -> Result<Option<Uri>, InvalidUri> {
368    std::env::var(key)
369        .or_else(|_| std::env::var(key.to_lowercase()))
370        .ok()
371        .map(Uri::try_from)
372        .transpose()
373}
374
375/// Parse the authentication from `uri` into proxy `Auth`, if present.
376fn get_uri_socks_auth(uri: &Uri) -> Result<Option<Auth>, url::ParseError> {
377    let full_url = uri.to_string().parse::<Url>()?;
378    let user = full_url.username();
379    let auth = if user.is_empty() {
380        None
381    } else {
382        let pass = full_url.password().unwrap_or_default();
383        Some(Auth::new(user, pass))
384    };
385    Ok(auth)
386}
387
388/// Get a [`Channel`] to the given [`Uri`].
389/// Sets up things like user agent without setting up QCS credentials.
390///
391/// This channel will be configured to route requests through proxies defined by
392/// `HTTPS_PROXY` and/or `HTTP_PROXY` environment variables, if they are defined.
393/// The variable names can be all-uppercase or all-lowercase, but the all-uppercase
394/// variants will take precedence. Supported proxy schemes are `http`, `https`, and `socks5`.
395///
396/// Proxy configuration caveats:
397/// - If both variables are defined, neither can be a `socks5` proxy, unless they are both the same value.
398/// - If only one variable is defined, and it is a `socks5` proxy, *all* traffic will be routed through it.
399///
400/// # Errors
401///
402/// See [`ChannelError`].
403pub fn get_channel(uri: Uri) -> Result<Channel, ChannelError> {
404    let endpoint = get_endpoint(uri);
405    get_channel_with_endpoint(&endpoint)
406}
407
408/// Get a [`Channel`] to the given [`Uri`], with an optional timeout. If set to [`None`], no timeout is
409/// used.
410/// Sets up things like user agent without setting up QCS credentials.
411///
412/// This channel will be configured to route requests through proxies defined by
413/// `HTTPS_PROXY` and/or `HTTP_PROXY` environment variables, if they are defined.
414/// The variable names can be all-uppercase or all-lowercase, but the all-uppercase
415/// variants will take precedence. Supported proxy schemes are `http`, `https`, and `socks5`.
416///
417/// Proxy configuration caveats:
418/// - If both variables are defined, neither can be a `socks5` proxy, unless they are both the same value.
419/// - If only one variable is defined, and it is a `socks5` proxy, *all* traffic will be routed through it.
420///
421/// # Errors
422///
423/// See [`ChannelError`].
424pub fn get_channel_with_timeout(
425    uri: Uri,
426    timeout: Option<Duration>,
427) -> Result<Channel, ChannelError> {
428    let endpoint = get_endpoint_with_timeout(uri, timeout);
429    get_channel_with_endpoint(&endpoint)
430}
431
432/// Get a [`Channel`] to the given [`Endpoint`]. Useful if [`get_channel`] or
433/// [`get_channel_with_timeout`] don't provide the configurability you need.
434///
435/// Use [`get_endpoint`] or [`get_endpoint_with_timeout`] to get a starting
436/// [`Endpoint`].
437///
438/// This channel will be configured to route requests through proxies defined by
439/// `HTTPS_PROXY` and/or `HTTP_PROXY` environment variables, if they are defined.
440/// The variable names can be all-uppercase or all-lowercase, but the all-uppercase
441/// variants will take precedence. Supported proxy schemes are `http`, `https`, and `socks5`.
442///
443/// Proxy configuration caveats:
444/// - If both variables are defined, neither can be a `socks5` proxy, unless they are both the same value.
445/// - If only one variable is defined, and it is a `socks5` proxy, *all* traffic will be routed through it.
446///
447/// # Errors
448///
449/// Returns a [`ChannelError`] if the channel cannot be constructed.
450#[allow(clippy::similar_names)] // http(s)_proxy are similar but precise in this case.
451pub fn get_channel_with_endpoint(endpoint: &Endpoint) -> Result<Channel, ChannelError> {
452    let https_proxy = get_env_uri("HTTPS_PROXY")?;
453    let http_proxy = get_env_uri("HTTP_PROXY")?;
454
455    let mut connector = HttpConnector::new();
456    connector.enforce_http(false);
457
458    let connect_to = |uri: http::Uri, intercept: Intercept| {
459        let connector = connector.clone();
460        match uri.scheme_str() {
461            Some("socks5") => {
462                let socks_connector = SocksConnector {
463                    auth: get_uri_socks_auth(&uri)?,
464                    proxy_addr: uri,
465                    connector,
466                };
467                Ok(endpoint.connect_with_connector_lazy(socks_connector))
468            }
469            Some("https" | "http") => {
470                let is_http = uri.scheme() == Some(&http::uri::Scheme::HTTP);
471                let proxy = Proxy::new(intercept, uri);
472                let mut proxy_connector = ProxyConnector::from_proxy(connector, proxy)?;
473                if is_http {
474                    proxy_connector.set_tls(None);
475                }
476                Ok(endpoint.connect_with_connector_lazy(proxy_connector))
477            }
478            scheme => Err(ChannelError::UnsupportedProtocol(scheme.map(String::from))),
479        }
480    };
481
482    let channel = match (https_proxy, http_proxy) {
483        // no proxies, default behavior
484        (None, None) => endpoint.connect_lazy(),
485
486        // either proxy may use https/http, or socks.
487        (Some(https_proxy), None) => connect_to(https_proxy, Intercept::Https)?,
488        (None, Some(http_proxy)) => connect_to(http_proxy, Intercept::Http)?,
489
490        // both proxies are set. If they are the same, they can be socks5. If there are different, they
491        // must both be `https?` URIs in order to use the same `ProxyConnector`.
492        (Some(https_proxy), Some(http_proxy)) => {
493            if https_proxy == http_proxy {
494                connect_to(https_proxy, Intercept::All)?
495            } else {
496                let accepted = [https_proxy.scheme_str(), http_proxy.scheme_str()]
497                    .into_iter()
498                    .all(|scheme| matches!(scheme, Some("https" | "http")));
499                if accepted {
500                    let mut proxy_connector = ProxyConnector::new(connector)?;
501                    proxy_connector.extend_proxies(vec![
502                        Proxy::new(Intercept::Https, https_proxy),
503                        Proxy::new(Intercept::Http, http_proxy),
504                    ]);
505                    endpoint.connect_with_connector_lazy(proxy_connector)
506                } else {
507                    return Err(ChannelError::Mismatch {
508                        https_proxy,
509                        http_proxy,
510                    });
511                }
512            }
513        }
514    };
515
516    Ok(channel)
517}
518
519/// Get a [`Channel`] to the given [`Uri`] with QCS authentication set up already.
520///
521/// # Errors
522///
523/// See [`Error`]
524pub fn get_wrapped_channel(
525    uri: Uri,
526) -> Result<RefreshService<Channel, ClientConfiguration>, Error<TokenError>> {
527    wrap_channel(get_channel(uri)?)
528}
529
530/// Set up the given `channel` with QCS authentication.
531#[must_use]
532pub fn wrap_channel_with<C>(
533    channel: C,
534    config: ClientConfiguration,
535) -> RefreshService<C, ClientConfiguration>
536where
537    C: GrpcService<BoxBody>,
538{
539    ServiceBuilder::new()
540        .layer(RefreshLayer::with_config(config))
541        .service(channel)
542}
543
544/// Set up the given `channel` which will automatically
545/// attempt to refresh its access token when a request fails
546/// do to an expired token
547pub fn wrap_channel_with_token_refresher<C, T>(
548    channel: C,
549    token_refresher: T,
550) -> RefreshService<C, T>
551where
552    C: GrpcService<BoxBody>,
553    T: TokenRefresher + Clone + Send + Sync,
554{
555    ServiceBuilder::new()
556        .layer(RefreshLayer::with_refresher(token_refresher))
557        .service(channel)
558}
559
560/// Set up the given `channel` with QCS authentication.
561///
562/// # Errors
563///
564/// See [`Error`]
565pub fn wrap_channel<C>(
566    channel: C,
567) -> Result<RefreshService<C, ClientConfiguration>, Error<TokenError>>
568where
569    C: GrpcService<BoxBody>,
570{
571    Ok(wrap_channel_with(channel, {
572        ClientConfiguration::load_default()?
573    }))
574}
575
576/// Set up the given `channel` with QCS authentication.
577///
578/// # Errors
579///
580/// See [`Error`]
581pub fn wrap_channel_with_profile<C>(
582    channel: C,
583    profile: String,
584) -> Result<RefreshService<C, ClientConfiguration>, Error<TokenError>>
585where
586    C: GrpcService<BoxBody>,
587{
588    Ok(wrap_channel_with(
589        channel,
590        ClientConfiguration::load_profile(profile)?,
591    ))
592}
593
594/// Add exponential backoff retry logic to the `channel`.
595pub fn wrap_channel_with_retry<C>(channel: C) -> RetryService<C>
596where
597    C: GrpcService<BoxBody>,
598{
599    ServiceBuilder::new()
600        .layer(RetryLayer::default())
601        .service(channel)
602}
603
604#[cfg(feature = "tracing")]
605/// Add a tracing layer with OpenTelemetry semantics to the `channel`.
606pub fn wrap_channel_with_tracing(
607    channel: Channel,
608    base_url: String,
609    configuration: TracingConfiguration,
610) -> CustomTraceService {
611    ServiceBuilder::new()
612        .layer(build_trace_layer(base_url, Some(&configuration)))
613        .service(channel)
614}