Skip to main content

qcs_api_client_grpc/tonic/
channel.rs

1//! Utilities for creating and configuring gRPC channels.
2//!
3//! The [`ChannelBuilder`] is the primary entry point for configuring a gRPC channel.
4use std::time::Duration;
5
6use backoff::ExponentialBackoff;
7use http::{uri::InvalidUri, Uri};
8use hyper_proxy2::{Intercept, Proxy, ProxyConnector};
9use hyper_socks2::{Auth, SocksConnector};
10use hyper_util::client::legacy::connect::HttpConnector;
11use tonic::{
12    body::Body,
13    client::GrpcService,
14    transport::{Channel, ClientTlsConfig, Endpoint},
15};
16use tower::{Layer, ServiceBuilder};
17use url::Url;
18
19use qcs_api_client_common::{
20    backoff::{self, default_backoff},
21    configuration::{tokens::TokenRefresher, ClientConfiguration, LoadError, TokenError},
22};
23
24#[cfg(feature = "tracing")]
25use qcs_api_client_common::tracing_configuration::TracingConfiguration;
26
27#[cfg(feature = "tracing")]
28use super::trace::{build_trace_layer, CustomTraceLayer, CustomTraceService};
29use super::{Error, RefreshLayer, RefreshService, RetryLayer, RetryService};
30
31/// Errors that may occur when configuring a channel connection
32#[derive(Debug, thiserror::Error)]
33#[non_exhaustive]
34pub enum ChannelError {
35    /// Failed to parse URI.
36    #[error("Failed to parse URI: {0}")]
37    InvalidUri(#[from] InvalidUri),
38    /// Failed to parse URL. Used to derive user/pass.
39    #[error("Failed to parse URL: {0}")]
40    InvalidUrl(#[from] url::ParseError),
41    /// Unsupported proxy protocol.
42    #[error("Protocol is missing or not supported: {0:?}")]
43    UnsupportedProtocol(Option<String>),
44    /// Proxy ssl verification failed
45    #[error("HTTP proxy ssl verification failed: {0}")]
46    SslFailure(#[from] std::io::Error),
47    /// Proxy targets do not agree
48    #[error("Cannot set separate https and http proxies if one of them is socks5")]
49    Mismatch {
50        /// The URI of the HTTPS proxy.
51        https_proxy: Uri,
52        /// The URI of the HTTP proxy.
53        http_proxy: Uri,
54    },
55}
56
57/// Defines a logic for turning some object into a [`GrpcService`].
58pub trait IntoService<C: GrpcService<Body>> {
59    /// The service type that will be returned.
60    type Service: GrpcService<Body>;
61
62    /// Convert the object into a service.
63    fn into_service(self, channel: C) -> Self::Service;
64}
65
66impl<C> IntoService<C> for ()
67where
68    C: GrpcService<Body>,
69{
70    type Service = C;
71    fn into_service(self, channel: C) -> Self::Service {
72        channel
73    }
74}
75
76/// Options for configuring QCS authentication.
77#[derive(Clone, Debug)]
78pub struct RefreshOptions<O, R>
79where
80    R: TokenRefresher + Clone + Send + Sync,
81{
82    layer: RefreshLayer<R>,
83    other: O,
84}
85
86impl<T> From<T> for RefreshOptions<(), T>
87where
88    T: TokenRefresher + Clone + Send + Sync,
89{
90    fn from(refresher: T) -> Self {
91        Self {
92            layer: RefreshLayer::with_refresher(refresher),
93            other: (),
94        }
95    }
96}
97
98impl<C, T, O> IntoService<C> for RefreshOptions<O, T>
99where
100    C: GrpcService<Body>,
101    O: IntoService<C>,
102    O::Service: GrpcService<Body>,
103    RefreshService<O::Service, T>: GrpcService<Body>,
104    T: TokenRefresher + Clone + Send + Sync + 'static,
105{
106    type Service = RefreshService<O::Service, T>;
107    fn into_service(self, channel: C) -> Self::Service {
108        let service = self.other.into_service(channel);
109        self.layer.layer(service)
110    }
111}
112
113/// Options for configuring retry logic.
114#[derive(Clone, Debug)]
115pub struct RetryOptions<O = ()> {
116    layer: RetryLayer,
117    other: O,
118}
119
120impl From<ExponentialBackoff> for RetryOptions<()> {
121    fn from(backoff: ExponentialBackoff) -> Self {
122        Self {
123            layer: RetryLayer { backoff },
124            other: (),
125        }
126    }
127}
128
129impl<C, O> IntoService<C> for RetryOptions<O>
130where
131    C: GrpcService<Body>,
132    O: IntoService<C>,
133    O::Service: GrpcService<Body>,
134    RetryService<O::Service>: GrpcService<Body>,
135{
136    type Service = RetryService<O::Service>;
137    fn into_service(self, channel: C) -> Self::Service {
138        let service = self.other.into_service(channel);
139        self.layer.layer(service)
140    }
141}
142
143/// Builder for configuring a [`Channel`].
144#[derive(Clone, Debug)]
145pub struct ChannelBuilder<O = ()> {
146    endpoint: Endpoint,
147    #[cfg(feature = "tracing")]
148    trace_layer: CustomTraceLayer,
149    options: O,
150}
151
152impl From<Endpoint> for ChannelBuilder<()> {
153    fn from(endpoint: Endpoint) -> Self {
154        #[cfg(feature = "tracing")]
155        {
156            let base_url = endpoint.uri().to_string();
157            Self {
158                endpoint,
159                trace_layer: build_trace_layer(base_url, None),
160                options: (),
161            }
162        }
163
164        #[cfg(not(feature = "tracing"))]
165        return Self {
166            endpoint,
167            options: (),
168        };
169    }
170}
171
172impl ChannelBuilder<()> {
173    /// Create a [`ChannelBuilder`] using the given [`Uri`]
174    pub fn from_uri(uri: Uri) -> Self {
175        #[cfg(feature = "tracing")]
176        {
177            let base_url = uri.to_string();
178            Self {
179                endpoint: get_endpoint(uri),
180                trace_layer: build_trace_layer(base_url, None),
181                options: (),
182            }
183        }
184
185        #[cfg(not(feature = "tracing"))]
186        return Self {
187            endpoint: get_endpoint(uri),
188            options: (),
189        };
190    }
191}
192
193#[cfg(feature = "tracing")]
194type TargetService = CustomTraceService;
195#[cfg(not(feature = "tracing"))]
196type TargetService = Channel;
197
198impl<O> ChannelBuilder<O>
199where
200    O: IntoService<TargetService>,
201{
202    /// Wrap the channel with a timeout.
203    #[must_use]
204    pub fn with_timeout(mut self, timeout: Duration) -> Self {
205        self.endpoint = self.endpoint.timeout(timeout);
206        self
207    }
208
209    /// Wrap the channel with the given [`RefreshLayer`].
210    pub fn with_refresh_layer<T>(
211        self,
212        layer: RefreshLayer<T>,
213    ) -> ChannelBuilder<RefreshOptions<O, T>>
214    where
215        T: TokenRefresher + Clone + Send + Sync,
216    {
217        #[cfg(feature = "tracing")]
218        return ChannelBuilder {
219            endpoint: self.endpoint,
220            trace_layer: self.trace_layer,
221            options: RefreshOptions {
222                layer,
223                other: self.options,
224            },
225        };
226        #[cfg(not(feature = "tracing"))]
227        return ChannelBuilder {
228            endpoint: self.endpoint,
229            options: RefreshOptions {
230                layer,
231                other: self.options,
232            },
233        };
234    }
235
236    /// Wrap the channel with QCS authentication using the given [`TokenRefresher`].
237    pub fn with_token_refresher<T>(self, refresher: T) -> ChannelBuilder<RefreshOptions<O, T>>
238    where
239        T: TokenRefresher + Clone + Send + Sync,
240    {
241        self.with_refresh_layer(RefreshLayer::with_refresher(refresher))
242    }
243
244    /// Wrap the channel with QCS authentication for the given [`ClientConfiguration`].
245    pub fn with_qcs_config(
246        self,
247        config: ClientConfiguration,
248    ) -> ChannelBuilder<RefreshOptions<O, ClientConfiguration>> {
249        #[cfg(feature = "tracing")]
250        {
251            let base_url = self.endpoint.uri().to_string();
252            let trace_layer = build_trace_layer(base_url, config.tracing_configuration());
253            let mut builder = self.with_token_refresher(config);
254            builder.trace_layer = trace_layer;
255            builder
256        }
257        #[cfg(not(feature = "tracing"))]
258        {
259            self.with_token_refresher(config)
260        }
261    }
262
263    /// Wrap the channel with QCS authentication for the given QCS profile.
264    ///
265    /// # Errors
266    ///
267    /// Returns a [`LoadError`] if the profile cannot be loaded.
268    pub fn with_qcs_profile(
269        self,
270        profile: Option<String>,
271    ) -> Result<ChannelBuilder<RefreshOptions<O, ClientConfiguration>>, LoadError> {
272        let config = match profile {
273            Some(profile) => ClientConfiguration::load_profile(profile)?,
274            None => ClientConfiguration::load_default()?,
275        };
276
277        Ok(self.with_qcs_config(config))
278    }
279
280    /// Wrap the channel with the given [`RetryLayer`].
281    pub fn with_retry_layer(self, layer: RetryLayer) -> ChannelBuilder<RetryOptions<O>> {
282        #[cfg(feature = "tracing")]
283        return ChannelBuilder {
284            endpoint: self.endpoint,
285            trace_layer: self.trace_layer,
286            options: RetryOptions {
287                layer,
288                other: self.options,
289            },
290        };
291        #[cfg(not(feature = "tracing"))]
292        return ChannelBuilder {
293            endpoint: self.endpoint,
294            options: RetryOptions {
295                layer,
296                other: self.options,
297            },
298        };
299    }
300
301    /// Wrap the channel with the given [`ExponentialBackoff`] configuration.
302    pub fn with_retry_backoff(
303        self,
304        backoff: ExponentialBackoff,
305    ) -> ChannelBuilder<RetryOptions<O>> {
306        self.with_retry_layer(RetryLayer { backoff })
307    }
308
309    /// Wrap the channel with the default retry logic. See [`default_backoff`].
310    pub fn with_default_retry(self) -> ChannelBuilder<RetryOptions<O>> {
311        self.with_retry_backoff(default_backoff())
312    }
313
314    /// Build the [`Channel`]
315    ///
316    /// # Errors
317    ///
318    /// Returns a [`ChannelError`] if the service cannot be built.
319    #[allow(clippy::result_large_err)]
320    pub fn build(self) -> Result<O::Service, ChannelError> {
321        let channel = get_channel_with_endpoint(&self.endpoint)?;
322        #[cfg(feature = "tracing")]
323        {
324            let traced_channel = self.trace_layer.layer(channel);
325            Ok(self.options.into_service(traced_channel))
326        }
327
328        #[cfg(not(feature = "tracing"))]
329        Ok(self.options.into_service(channel))
330    }
331}
332
333/// Parse a string as a URI.
334///
335/// This serves as a helper to avoid consumers needing to create a new error just to include this.
336///
337/// # Errors
338///
339/// [`Error::InvalidUri`] if the string is an invalid URI.
340#[allow(clippy::result_large_err)]
341pub fn parse_uri(s: &str) -> Result<Uri, Error<TokenError>> {
342    s.parse().map_err(Error::from)
343}
344
345/// Get an [`Endpoint`] for the given [`Uri`]
346#[allow(clippy::missing_panics_doc)]
347pub fn get_endpoint(uri: Uri) -> Endpoint {
348    Channel::builder(uri)
349        .user_agent(concat!(
350            "QCS gRPC Client (Rust)/",
351            env!("CARGO_PKG_VERSION")
352        ))
353        .expect("user agent string should be valid")
354        .http2_adaptive_window(true)
355        .tls_config(ClientTlsConfig::new().with_enabled_roots())
356        .expect("tls setup should succeed")
357}
358
359/// Get an [`Endpoint`] for the given [`Uri`] and timeout.
360pub fn get_endpoint_with_timeout(uri: Uri, timeout: Option<Duration>) -> Endpoint {
361    if let Some(duration) = timeout {
362        get_endpoint(uri).timeout(duration)
363    } else {
364        get_endpoint(uri)
365    }
366}
367
368/// Fetch the env var named for `key` and parse as a `Uri`.
369/// Tries the original casing, then the full lowercasing of `key`.
370fn get_env_uri(key: &str) -> Result<Option<Uri>, InvalidUri> {
371    std::env::var(key)
372        .or_else(|_| std::env::var(key.to_lowercase()))
373        .ok()
374        .map(Uri::try_from)
375        .transpose()
376}
377
378/// Parse the authentication from `uri` into proxy `Auth`, if present.
379fn get_uri_socks_auth(uri: &Uri) -> Result<Option<Auth>, url::ParseError> {
380    let full_url = uri.to_string().parse::<Url>()?;
381    let user = full_url.username();
382    let auth = if user.is_empty() {
383        None
384    } else {
385        let pass = full_url.password().unwrap_or_default();
386        Some(Auth::new(user, pass))
387    };
388    Ok(auth)
389}
390
391/// Get a [`Channel`] to the given [`Uri`].
392/// Sets up things like user agent without setting up QCS credentials.
393///
394/// This channel will be configured to route requests through proxies defined by
395/// `HTTPS_PROXY` and/or `HTTP_PROXY` environment variables, if they are defined.
396/// The variable names can be all-uppercase or all-lowercase, but the all-uppercase
397/// variants will take precedence. Supported proxy schemes are `http`, `https`, and `socks5`.
398///
399/// Proxy configuration caveats:
400/// - If both variables are defined, neither can be a `socks5` proxy, unless they are both the same value.
401/// - If only one variable is defined, and it is a `socks5` proxy, *all* traffic will be routed through it.
402///
403/// # Errors
404///
405/// See [`ChannelError`].
406#[allow(clippy::result_large_err)]
407pub fn get_channel(uri: Uri) -> Result<Channel, ChannelError> {
408    let endpoint = get_endpoint(uri);
409    get_channel_with_endpoint(&endpoint)
410}
411
412/// Get a [`Channel`] to the given [`Uri`], with an optional timeout. If set to [`None`], no timeout is
413/// used.
414/// Sets up things like user agent without setting up QCS credentials.
415///
416/// This channel will be configured to route requests through proxies defined by
417/// `HTTPS_PROXY` and/or `HTTP_PROXY` environment variables, if they are defined.
418/// The variable names can be all-uppercase or all-lowercase, but the all-uppercase
419/// variants will take precedence. Supported proxy schemes are `http`, `https`, and `socks5`.
420///
421/// Proxy configuration caveats:
422/// - If both variables are defined, neither can be a `socks5` proxy, unless they are both the same value.
423/// - If only one variable is defined, and it is a `socks5` proxy, *all* traffic will be routed through it.
424///
425/// # Errors
426///
427/// See [`ChannelError`].
428#[allow(clippy::result_large_err)]
429pub fn get_channel_with_timeout(
430    uri: Uri,
431    timeout: Option<Duration>,
432) -> Result<Channel, ChannelError> {
433    let endpoint = get_endpoint_with_timeout(uri, timeout);
434    get_channel_with_endpoint(&endpoint)
435}
436
437/// Get a [`Channel`] to the given [`Endpoint`]. Useful if [`get_channel`] or
438/// [`get_channel_with_timeout`] don't provide the configurability you need.
439///
440/// Use [`get_endpoint`] or [`get_endpoint_with_timeout`] to get a starting
441/// [`Endpoint`].
442///
443/// This channel will be configured to route requests through proxies defined by
444/// `HTTPS_PROXY` and/or `HTTP_PROXY` environment variables, if they are defined.
445/// The variable names can be all-uppercase or all-lowercase, but the all-uppercase
446/// variants will take precedence. Supported proxy schemes are `http`, `https`, and `socks5`.
447///
448/// Proxy configuration caveats:
449/// - If both variables are defined, neither can be a `socks5` proxy, unless they are both the same value.
450/// - If only one variable is defined, and it is a `socks5` proxy, *all* traffic will be routed through it.
451///
452/// # Errors
453///
454/// Returns a [`ChannelError`] if the channel cannot be constructed.
455#[allow(
456    clippy::similar_names,
457    reason = "http(s)_proxy are similar but precise in this case"
458)]
459#[allow(clippy::result_large_err)]
460pub fn get_channel_with_endpoint(endpoint: &Endpoint) -> Result<Channel, ChannelError> {
461    let https_proxy = get_env_uri("HTTPS_PROXY")?;
462    let http_proxy = get_env_uri("HTTP_PROXY")?;
463
464    let mut connector = HttpConnector::new();
465    connector.enforce_http(false);
466
467    let connect_to = |uri: http::Uri, intercept: Intercept| {
468        let connector = connector.clone();
469        match uri.scheme_str() {
470            Some("socks5") => {
471                let socks_connector = SocksConnector {
472                    auth: get_uri_socks_auth(&uri)?,
473                    proxy_addr: uri,
474                    connector,
475                };
476                Ok(endpoint.connect_with_connector_lazy(socks_connector))
477            }
478            Some("https" | "http") => {
479                let is_http = uri.scheme() == Some(&http::uri::Scheme::HTTP);
480                let proxy = Proxy::new(intercept, uri);
481                let mut proxy_connector = ProxyConnector::from_proxy(connector, proxy)?;
482                if is_http {
483                    proxy_connector.set_tls(None);
484                }
485                Ok(endpoint.connect_with_connector_lazy(proxy_connector))
486            }
487            scheme => Err(ChannelError::UnsupportedProtocol(scheme.map(String::from))),
488        }
489    };
490
491    let channel = match (https_proxy, http_proxy) {
492        // no proxies, default behavior
493        (None, None) => endpoint.connect_lazy(),
494
495        // either proxy may use https/http, or socks.
496        (Some(https_proxy), None) => connect_to(https_proxy, Intercept::Https)?,
497        (None, Some(http_proxy)) => connect_to(http_proxy, Intercept::Http)?,
498
499        // both proxies are set. If they are the same, they can be socks5. If there are different, they
500        // must both be `https?` URIs in order to use the same `ProxyConnector`.
501        (Some(https_proxy), Some(http_proxy)) => {
502            if https_proxy == http_proxy {
503                connect_to(https_proxy, Intercept::All)?
504            } else {
505                let accepted = [https_proxy.scheme_str(), http_proxy.scheme_str()]
506                    .into_iter()
507                    .all(|scheme| matches!(scheme, Some("https" | "http")));
508                if accepted {
509                    let mut proxy_connector = ProxyConnector::new(connector)?;
510                    proxy_connector.extend_proxies(vec![
511                        Proxy::new(Intercept::Https, https_proxy),
512                        Proxy::new(Intercept::Http, http_proxy),
513                    ]);
514                    endpoint.connect_with_connector_lazy(proxy_connector)
515                } else {
516                    return Err(ChannelError::Mismatch {
517                        https_proxy,
518                        http_proxy,
519                    });
520                }
521            }
522        }
523    };
524
525    Ok(channel)
526}
527
528/// Get a [`Channel`] to the given [`Uri`] with QCS authentication set up already.
529///
530/// # Errors
531///
532/// See [`Error`]
533#[allow(clippy::result_large_err)]
534pub fn get_wrapped_channel(
535    uri: Uri,
536) -> Result<RefreshService<Channel, ClientConfiguration>, Error<TokenError>> {
537    wrap_channel(get_channel(uri)?)
538}
539
540/// Set up the given `channel` with QCS authentication.
541#[must_use]
542pub fn wrap_channel_with<C>(
543    channel: C,
544    config: ClientConfiguration,
545) -> RefreshService<C, ClientConfiguration>
546where
547    C: GrpcService<Body>,
548{
549    ServiceBuilder::new()
550        .layer(RefreshLayer::with_config(config))
551        .service(channel)
552}
553
554/// Set up the given `channel` which will automatically
555/// attempt to refresh its access token when a request fails
556/// do to an expired token
557pub fn wrap_channel_with_token_refresher<C, T>(
558    channel: C,
559    token_refresher: T,
560) -> RefreshService<C, T>
561where
562    C: GrpcService<Body>,
563    T: TokenRefresher + Clone + Send + Sync,
564{
565    ServiceBuilder::new()
566        .layer(RefreshLayer::with_refresher(token_refresher))
567        .service(channel)
568}
569
570/// Set up the given `channel` with QCS authentication.
571///
572/// # Errors
573///
574/// See [`Error`]
575#[allow(clippy::result_large_err)]
576pub fn wrap_channel<C>(
577    channel: C,
578) -> Result<RefreshService<C, ClientConfiguration>, Error<TokenError>>
579where
580    C: GrpcService<Body>,
581{
582    Ok(wrap_channel_with(channel, {
583        ClientConfiguration::load_default()?
584    }))
585}
586
587/// Set up the given `channel` with QCS authentication.
588///
589/// # Errors
590///
591/// See [`Error`]
592#[allow(clippy::result_large_err)]
593pub fn wrap_channel_with_profile<C>(
594    channel: C,
595    profile: String,
596) -> Result<RefreshService<C, ClientConfiguration>, Error<TokenError>>
597where
598    C: GrpcService<Body>,
599{
600    Ok(wrap_channel_with(
601        channel,
602        ClientConfiguration::load_profile(profile)?,
603    ))
604}
605
606/// Add exponential backoff retry logic to the `channel`.
607pub fn wrap_channel_with_retry<C>(channel: C) -> RetryService<C>
608where
609    C: GrpcService<Body>,
610{
611    ServiceBuilder::new()
612        .layer(RetryLayer::default())
613        .service(channel)
614}
615
616#[cfg(feature = "tracing")]
617/// Add a tracing layer with OpenTelemetry semantics to the `channel`.
618pub fn wrap_channel_with_tracing(
619    channel: Channel,
620    base_url: String,
621    configuration: TracingConfiguration,
622) -> CustomTraceService {
623    ServiceBuilder::new()
624        .layer(build_trace_layer(base_url, Some(&configuration)))
625        .service(channel)
626}