s2/
client.rs

1//! SDK client implementation.
2//!
3//! The module defines three clients:
4//!
5//! |                  | Operations               | API Service      |
6//! |------------------|--------------------------|------------------|
7//! | [`Client`]       | Account level operations | [AccountService] |
8//! | [`BasinClient`]  | Basin level operations   | [BasinService]   |
9//! | [`StreamClient`] | Stream level operations  | [StreamService]  |
10//!
11//! To interact with any client, you need an authentication token which can be
12//! generated by the the web console at [s2.dev]. This token is passed to the
13//! client via a [`ClientConfig`].
14//!
15//! Along with the authentication token, a [`ClientConfig`] defines other
16//! request parameters such as timeouts, S2 endpoints, etc.
17//!
18//! A client can be created using the corresponding `new()` method. To avoid
19//! creating multiple connections to each service, [`Client::basin_client`] can be
20//! used to create a [`BasinClient`] and [`BasinClient::stream_client`] can be
21//! used to create a [`StreamClient`].
22//!
23//! **Note:** Even though the client creation operation is cheap, a
24//! [`BasinClient`] should preferably be stored instead of using
25//! [`Client::basin_client`] multiple times as the account endpoint might vary
26//! from the basin endpoint creating a new connection each time the request is
27//! sent. See [`S2Endpoints`].
28//!
29//! [AccountService]: https://s2.dev/docs/interface/grpc#accountservice
30//! [BasinService]: https://s2.dev/docs/interface/grpc#basinservice
31//! [StreamService]: https://s2.dev/docs/interface/grpc#streamservice
32//! [s2.dev]: https://s2.dev/dashboard
33
34use std::{env::VarError, fmt::Display, str::FromStr, time::Duration};
35
36use backon::{BackoffBuilder, ConstantBuilder, Retryable};
37use futures::StreamExt;
38use http::{HeaderValue, uri::Authority};
39use hyper_util::client::legacy::connect::HttpConnector;
40use secrecy::SecretString;
41use sync_docs::sync_docs;
42use tokio::{sync::mpsc, time::sleep};
43use tokio_stream::wrappers::ReceiverStream;
44use tonic::{
45    metadata::AsciiMetadataValue,
46    transport::{Channel, ClientTlsConfig, Endpoint},
47};
48use tonic_side_effect::{FrameSignal, RequestFrameMonitor};
49
50use crate::{
51    api::{
52        account_service_client::AccountServiceClient, basin_service_client::BasinServiceClient,
53        stream_service_client::StreamServiceClient,
54    },
55    append_session,
56    service::{
57        ServiceRequest, ServiceStreamingResponse, Streaming,
58        account::{
59            CreateBasinServiceRequest, DeleteBasinServiceRequest, GetBasinConfigServiceRequest,
60            IssueAccessTokenServiceRequest, ListAccessTokensServiceRequest,
61            ListBasinsServiceRequest, ReconfigureBasinServiceRequest,
62            RevokeAccessTokenServiceRequest,
63        },
64        basin::{
65            CreateStreamServiceRequest, DeleteStreamServiceRequest, GetStreamConfigServiceRequest,
66            ListStreamsServiceRequest, ReconfigureStreamServiceRequest,
67        },
68        send_request,
69        stream::{
70            AppendServiceRequest, CheckTailServiceRequest, ReadServiceRequest,
71            ReadSessionServiceRequest, ReadSessionStreamingResponse,
72        },
73    },
74    types::{self, MIB_BYTES, MeteredBytes},
75};
76
77const DEFAULT_CONNECTOR: Option<HttpConnector> = None;
78
79/// S2 cloud environment to connect with.
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum S2Cloud {
82    /// S2 running on AWS.
83    Aws,
84}
85
86impl S2Cloud {
87    const AWS: &'static str = "aws";
88
89    fn as_str(&self) -> &'static str {
90        match self {
91            Self::Aws => Self::AWS,
92        }
93    }
94}
95
96impl Display for S2Cloud {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        f.write_str(self.as_str())
99    }
100}
101
102impl FromStr for S2Cloud {
103    type Err = String;
104
105    fn from_str(s: &str) -> Result<Self, Self::Err> {
106        if s.eq_ignore_ascii_case(Self::AWS) {
107            Ok(Self::Aws)
108        } else {
109            Err(s.to_owned())
110        }
111    }
112}
113
114/// Endpoint for connecting to an S2 basin.
115#[derive(Debug, Clone)]
116pub enum BasinEndpoint {
117    /// Parent zone for basins.
118    /// DNS is used to route to the correct cell for the basin.
119    ParentZone(Authority),
120    /// Direct cell endpoint.
121    /// The `S2-Basin` header is included in requests to specify the basin,
122    /// which is expected to be hosted by this cell.
123    Direct(Authority),
124}
125
126/// Endpoints for the S2 environment.
127///
128/// You can find the S2 endpoints in our [documentation].
129///
130/// [documentation]: https://s2.dev/docs/interface/endpoints
131#[derive(Debug, Clone)]
132pub struct S2Endpoints {
133    /// Used by `AccountService` requests.
134    pub account: Authority,
135    /// Used by `BasinService` and `StreamService` requests.
136    pub basin: BasinEndpoint,
137}
138
139/// Retry policy for append requests.
140#[derive(Debug, Clone)]
141pub enum AppendRetryPolicy {
142    /// Retry all eligible failures encountered during an append.
143    ///
144    /// This could result in append batches being duplicated on the stream.
145    All,
146
147    /// Retry only failures with no side effects.
148    ///
149    /// Will not attempt to retry failures where it cannot be concluded whether
150    /// an append may become durable, in order to prevent duplicates.
151    NoSideEffects,
152}
153
154impl S2Endpoints {
155    /// Get S2 endpoints for the specified cloud.
156    pub fn for_cloud(cloud: S2Cloud) -> Self {
157        Self {
158            account: format!("{cloud}.s2.dev")
159                .try_into()
160                .expect("valid authority"),
161            basin: BasinEndpoint::ParentZone(
162                format!("b.{cloud}.s2.dev")
163                    .try_into()
164                    .expect("valid authority"),
165            ),
166        }
167    }
168
169    /// Get S2 endpoints for the specified cell.
170    pub fn for_cell(
171        cloud: S2Cloud,
172        cell_id: impl Into<String>,
173    ) -> Result<Self, http::uri::InvalidUri> {
174        let cell_endpoint: Authority = format!("{}.o.{cloud}.s2.dev", cell_id.into()).try_into()?;
175        Ok(Self {
176            account: cell_endpoint.clone(),
177            basin: BasinEndpoint::Direct(cell_endpoint),
178        })
179    }
180
181    /// Get S2 endpoints from environment variables.
182    ///
183    /// The following environment variables are used:
184    /// - `S2_CLOUD`: Valid S2 cloud name. Defaults to AWS.
185    /// - `S2_ACCOUNT_ENDPOINT`: Overrides the account endpoint.
186    /// - `S2_BASIN_ENDPOINT`: Overrides the basin endpoint. The prefix `"{basin}."` indicates the
187    ///   basin endpoint is `ParentZone` else `Direct`.
188    pub fn from_env() -> Result<Self, String> {
189        let cloud: S2Cloud = std::env::var("S2_CLOUD")
190            .ok()
191            .as_deref()
192            .unwrap_or(S2Cloud::AWS)
193            .parse()
194            .map_err(|cloud| format!("Invalid S2_CLOUD: {cloud}"))?;
195
196        let mut endpoints = Self::for_cloud(cloud);
197
198        match std::env::var("S2_ACCOUNT_ENDPOINT") {
199            Ok(spec) => {
200                endpoints.account = spec
201                    .as_str()
202                    .try_into()
203                    .map_err(|_| format!("Invalid S2_ACCOUNT_ENDPOINT: {spec}"))?;
204            }
205            Err(VarError::NotPresent) => {}
206            Err(VarError::NotUnicode(_)) => {
207                return Err("Invalid S2_ACCOUNT_ENDPOINT: not Unicode".to_owned());
208            }
209        }
210
211        match std::env::var("S2_BASIN_ENDPOINT") {
212            Ok(spec) => {
213                endpoints.basin = if let Some(parent_zone) = spec.strip_prefix("{basin}.") {
214                    BasinEndpoint::ParentZone(
215                        parent_zone
216                            .try_into()
217                            .map_err(|e| format!("Invalid S2_BASIN_ENDPOINT ({e}): {spec}"))?,
218                    )
219                } else {
220                    BasinEndpoint::Direct(
221                        spec.as_str()
222                            .try_into()
223                            .map_err(|e| format!("Invalid S2_BASIN_ENDPOINT ({e}): {spec}"))?,
224                    )
225                }
226            }
227            Err(VarError::NotPresent) => {}
228            Err(VarError::NotUnicode(_)) => {
229                return Err("Invalid S2_BASIN_ENDPOINT: not Unicode".to_owned());
230            }
231        }
232
233        Ok(endpoints)
234    }
235}
236
237/// Client configuration.
238#[derive(Debug, Clone)]
239pub struct ClientConfig {
240    pub(crate) token: SecretString,
241    pub(crate) endpoints: S2Endpoints,
242    pub(crate) connection_timeout: Duration,
243    pub(crate) request_timeout: Duration,
244    pub(crate) user_agent: HeaderValue,
245    pub(crate) append_retry_policy: AppendRetryPolicy,
246    #[cfg(feature = "connector")]
247    pub(crate) uri_scheme: http::uri::Scheme,
248    pub(crate) retry_backoff_duration: Duration,
249    pub(crate) max_attempts: usize,
250    pub(crate) max_append_inflight_bytes: u64,
251    pub(crate) compression: bool,
252}
253
254impl ClientConfig {
255    /// Initialize a default client configuration with the specified authentication token.
256    pub fn new(token: impl Into<String>) -> Self {
257        Self {
258            token: token.into().into(),
259            endpoints: S2Endpoints::for_cloud(S2Cloud::Aws),
260            connection_timeout: Duration::from_secs(3),
261            request_timeout: Duration::from_secs(5),
262            user_agent: "s2-sdk-rust".parse().expect("valid user agent"),
263            append_retry_policy: AppendRetryPolicy::All,
264            #[cfg(feature = "connector")]
265            uri_scheme: http::uri::Scheme::HTTPS,
266            retry_backoff_duration: Duration::from_millis(100),
267            max_attempts: 3,
268            max_append_inflight_bytes: 100 * MIB_BYTES,
269            compression: false,
270        }
271    }
272
273    /// S2 endpoints to connect to.
274    pub fn with_endpoints(self, host_endpoints: impl Into<S2Endpoints>) -> Self {
275        Self {
276            endpoints: host_endpoints.into(),
277            ..self
278        }
279    }
280
281    /// Timeout for connecting and transparently reconnecting. Defaults to 3s.
282    pub fn with_connection_timeout(self, connection_timeout: impl Into<Duration>) -> Self {
283        Self {
284            connection_timeout: connection_timeout.into(),
285            ..self
286        }
287    }
288
289    /// Timeout for a particular request. Defaults to 5s.
290    pub fn with_request_timeout(self, request_timeout: impl Into<Duration>) -> Self {
291        Self {
292            request_timeout: request_timeout.into(),
293            ..self
294        }
295    }
296
297    /// User agent. Defaults to `s2-sdk-rust`. Feel free to say hi.
298    pub fn with_user_agent(self, user_agent: HeaderValue) -> Self {
299        Self { user_agent, ..self }
300    }
301
302    /// Retry policy for appends.
303    /// Only relevant if `max_attempts > 1`.
304    ///
305    /// Defaults to retries of all failures, meaning duplicates on a stream are possible.
306    pub fn with_append_retry_policy(
307        self,
308        append_retry_policy: impl Into<AppendRetryPolicy>,
309    ) -> Self {
310        Self {
311            append_retry_policy: append_retry_policy.into(),
312            ..self
313        }
314    }
315
316    /// Maximum total size of currently inflight (pending acknowledgment) append
317    /// batches, per append session, as measured by `MeteredSize` formula.
318    ///
319    /// Must be at least 1 MiB. Defaults to 100 MiB.
320    pub fn with_max_append_inflight_bytes(self, max_append_inflight_bytes: u64) -> Self {
321        assert!(
322            max_append_inflight_bytes >= MIB_BYTES,
323            "max_append_inflight_bytes must be at least 1MiB"
324        );
325        Self {
326            max_append_inflight_bytes,
327            ..self
328        }
329    }
330
331    /// URI scheme to use when connecting with a custom connector. Defaults to `https`.
332    #[cfg(feature = "connector")]
333    pub fn with_uri_scheme(self, uri_scheme: impl Into<http::uri::Scheme>) -> Self {
334        Self {
335            uri_scheme: uri_scheme.into(),
336            ..self
337        }
338    }
339
340    /// Backoff duration when retrying.
341    /// Defaults to 100ms.
342    /// A jitter is always applied.
343    pub fn with_retry_backoff_duration(self, retry_backoff_duration: impl Into<Duration>) -> Self {
344        Self {
345            retry_backoff_duration: retry_backoff_duration.into(),
346            ..self
347        }
348    }
349
350    /// Maximum number of attempts per request.
351    /// Setting it to 1 disables retrying.
352    /// The default is to make 3 attempts.
353    pub fn with_max_attempts(self, max_attempts: usize) -> Self {
354        assert!(max_attempts > 0, "max attempts must be greater than 0");
355        Self {
356            max_attempts,
357            ..self
358        }
359    }
360
361    /// Configure compression for requests and responses.
362    /// Disabled by default.
363    pub fn with_compression(self, compression: bool) -> Self {
364        Self {
365            compression,
366            ..self
367        }
368    }
369}
370
371/// Error from client operations.
372#[derive(Debug, Clone, thiserror::Error)]
373pub enum ClientError {
374    /// SDK type conversion errors.
375    ///
376    /// Indicates an incompatibility between the SDK version and service.
377    #[error(transparent)]
378    Conversion(#[from] types::ConvertError),
379    /// Error status from service.
380    #[error(transparent)]
381    Service(#[from] tonic::Status),
382}
383
384/// Client for account-level operations.
385#[derive(Debug, Clone)]
386pub struct Client {
387    inner: ClientInner,
388}
389
390impl Client {
391    /// Create a new SDK client.
392    pub fn new(config: ClientConfig) -> Self {
393        Self {
394            inner: ClientInner::new(ClientKind::Account, config, DEFAULT_CONNECTOR),
395        }
396    }
397
398    /// Create a new SDK client using a custom connector.
399    #[cfg(feature = "connector")]
400    pub fn new_with_connector<C>(config: ClientConfig, connector: C) -> Self
401    where
402        C: tower_service::Service<http::Uri> + Send + 'static,
403        C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
404        C::Future: Send,
405        C::Error: std::error::Error + Send + Sync + 'static,
406    {
407        Self {
408            inner: ClientInner::new(ClientKind::Account, config, Some(connector)),
409        }
410    }
411
412    /// Create basin client from the given name.
413    pub fn basin_client(&self, basin: types::BasinName) -> BasinClient {
414        BasinClient {
415            inner: self.inner.for_basin(basin),
416        }
417    }
418
419    #[sync_docs]
420    pub async fn list_basins(
421        &self,
422        req: types::ListBasinsRequest,
423    ) -> Result<types::ListBasinsResponse, ClientError> {
424        self.inner
425            .send_retryable(ListBasinsServiceRequest::new(
426                self.inner.account_service_client(),
427                req,
428            ))
429            .await
430    }
431
432    #[sync_docs]
433    pub async fn create_basin(
434        &self,
435        req: types::CreateBasinRequest,
436    ) -> Result<types::BasinInfo, ClientError> {
437        self.inner
438            .send_retryable(CreateBasinServiceRequest::new(
439                self.inner.account_service_client(),
440                req,
441            ))
442            .await
443    }
444
445    #[sync_docs]
446    pub async fn delete_basin(&self, req: types::DeleteBasinRequest) -> Result<(), ClientError> {
447        self.inner
448            .send_retryable(DeleteBasinServiceRequest::new(
449                self.inner.account_service_client(),
450                req,
451            ))
452            .await
453    }
454
455    #[sync_docs]
456    pub async fn get_basin_config(
457        &self,
458        basin: types::BasinName,
459    ) -> Result<types::BasinConfig, ClientError> {
460        self.inner
461            .send_retryable(GetBasinConfigServiceRequest::new(
462                self.inner.account_service_client(),
463                basin,
464            ))
465            .await
466    }
467
468    #[sync_docs]
469    pub async fn reconfigure_basin(
470        &self,
471        req: types::ReconfigureBasinRequest,
472    ) -> Result<types::BasinConfig, ClientError> {
473        self.inner
474            .send_retryable(ReconfigureBasinServiceRequest::new(
475                self.inner.account_service_client(),
476                req,
477            ))
478            .await
479    }
480
481    #[sync_docs]
482    pub async fn issue_access_token(
483        &self,
484        info: types::AccessTokenInfo,
485    ) -> Result<String, ClientError> {
486        self.inner
487            .send_retryable(IssueAccessTokenServiceRequest::new(
488                self.inner.account_service_client(),
489                info,
490            ))
491            .await
492    }
493
494    #[sync_docs]
495    pub async fn revoke_access_token(
496        &self,
497        id: types::AccessTokenId,
498    ) -> Result<types::AccessTokenInfo, ClientError> {
499        self.inner
500            .send_retryable(RevokeAccessTokenServiceRequest::new(
501                self.inner.account_service_client(),
502                id,
503            ))
504            .await
505    }
506
507    #[sync_docs]
508    pub async fn list_access_tokens(
509        &self,
510        req: types::ListAccessTokensRequest,
511    ) -> Result<types::ListAccessTokensResponse, ClientError> {
512        self.inner
513            .send_retryable(ListAccessTokensServiceRequest::new(
514                self.inner.account_service_client(),
515                req,
516            ))
517            .await
518    }
519}
520
521/// Client for basin-level operations.
522#[derive(Debug, Clone)]
523pub struct BasinClient {
524    inner: ClientInner,
525}
526
527impl BasinClient {
528    /// Create a new basin client.
529    pub fn new(config: ClientConfig, basin: types::BasinName) -> Self {
530        Self {
531            inner: ClientInner::new(ClientKind::Basin(basin), config, DEFAULT_CONNECTOR),
532        }
533    }
534
535    /// Create a new basin client using a custom connector.
536    #[cfg(feature = "connector")]
537    pub fn new_with_connector<C>(
538        config: ClientConfig,
539        basin: types::BasinName,
540        connector: C,
541    ) -> Self
542    where
543        C: tower_service::Service<http::Uri> + Send + 'static,
544        C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
545        C::Future: Send,
546        C::Error: std::error::Error + Send + Sync + 'static,
547    {
548        Self {
549            inner: ClientInner::new(ClientKind::Basin(basin), config, Some(connector)),
550        }
551    }
552
553    /// Create a new client for stream-level operations.
554    pub fn stream_client(&self, stream: impl Into<String>) -> StreamClient {
555        StreamClient {
556            inner: self.inner.clone(),
557            stream: stream.into(),
558        }
559    }
560
561    #[sync_docs]
562    pub async fn create_stream(
563        &self,
564        req: types::CreateStreamRequest,
565    ) -> Result<types::StreamInfo, ClientError> {
566        self.inner
567            .send_retryable(CreateStreamServiceRequest::new(
568                self.inner.basin_service_client(),
569                req,
570            ))
571            .await
572    }
573
574    #[sync_docs]
575    pub async fn list_streams(
576        &self,
577        req: types::ListStreamsRequest,
578    ) -> Result<types::ListStreamsResponse, ClientError> {
579        self.inner
580            .send_retryable(ListStreamsServiceRequest::new(
581                self.inner.basin_service_client(),
582                req,
583            ))
584            .await
585    }
586
587    #[sync_docs]
588    pub async fn get_stream_config(
589        &self,
590        stream: impl Into<String>,
591    ) -> Result<types::StreamConfig, ClientError> {
592        self.inner
593            .send_retryable(GetStreamConfigServiceRequest::new(
594                self.inner.basin_service_client(),
595                stream,
596            ))
597            .await
598    }
599
600    #[sync_docs]
601    pub async fn reconfigure_stream(
602        &self,
603        req: types::ReconfigureStreamRequest,
604    ) -> Result<types::StreamConfig, ClientError> {
605        self.inner
606            .send(ReconfigureStreamServiceRequest::new(
607                self.inner.basin_service_client(),
608                req,
609            ))
610            .await
611    }
612
613    #[sync_docs]
614    pub async fn delete_stream(&self, req: types::DeleteStreamRequest) -> Result<(), ClientError> {
615        self.inner
616            .send_retryable(DeleteStreamServiceRequest::new(
617                self.inner.basin_service_client(),
618                req,
619            ))
620            .await
621    }
622}
623
624/// Client for stream-level operations.
625#[derive(Debug, Clone)]
626pub struct StreamClient {
627    pub(crate) inner: ClientInner,
628    pub(crate) stream: String,
629}
630
631impl StreamClient {
632    /// Create a new stream client.
633    pub fn new(config: ClientConfig, basin: types::BasinName, stream: impl Into<String>) -> Self {
634        BasinClient::new(config, basin).stream_client(stream)
635    }
636
637    /// Create a new stream client using a custom connector.
638    #[cfg(feature = "connector")]
639    pub fn new_with_connector<C>(
640        config: ClientConfig,
641        basin: types::BasinName,
642        stream: impl Into<String>,
643        connector: C,
644    ) -> Self
645    where
646        C: tower_service::Service<http::Uri> + Send + 'static,
647        C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
648        C::Future: Send,
649        C::Error: std::error::Error + Send + Sync + 'static,
650    {
651        BasinClient::new_with_connector(config, basin, connector).stream_client(stream)
652    }
653
654    #[sync_docs]
655    pub async fn check_tail(&self) -> Result<u64, ClientError> {
656        self.inner
657            .send_retryable(CheckTailServiceRequest::new(
658                self.inner.stream_service_client(),
659                &self.stream,
660            ))
661            .await
662    }
663
664    #[sync_docs]
665    pub async fn read(&self, req: types::ReadRequest) -> Result<types::ReadOutput, ClientError> {
666        self.inner
667            .send_retryable(ReadServiceRequest::new(
668                self.inner.stream_service_client(),
669                &self.stream,
670                req,
671                self.inner.config.compression,
672            ))
673            .await
674    }
675
676    #[sync_docs]
677    pub async fn read_session(
678        &self,
679        req: types::ReadSessionRequest,
680    ) -> Result<Streaming<types::ReadOutput>, ClientError> {
681        let request = ReadSessionServiceRequest::new(
682            self.inner.stream_service_client(),
683            &self.stream,
684            req,
685            self.inner.config.compression,
686        );
687        self.inner
688            .send_retryable(request.clone())
689            .await
690            .map(|responses| {
691                Box::pin(read_resumption_stream(
692                    request,
693                    responses,
694                    self.inner.clone(),
695                )) as _
696            })
697    }
698
699    #[sync_docs]
700    pub async fn append(
701        &self,
702        req: types::AppendInput,
703    ) -> Result<types::AppendOutput, ClientError> {
704        let frame_signal = FrameSignal::new();
705        self.inner
706            .send_retryable(AppendServiceRequest::new(
707                self.inner
708                    .frame_monitoring_stream_service_client(frame_signal.clone()),
709                self.inner.config.append_retry_policy.clone(),
710                frame_signal,
711                &self.stream,
712                req,
713                self.inner.config.compression,
714            ))
715            .await
716    }
717
718    #[sync_docs]
719    #[allow(clippy::unused_async)]
720    pub async fn append_session<S>(
721        &self,
722        req: S,
723    ) -> Result<Streaming<types::AppendOutput>, ClientError>
724    where
725        S: 'static + Send + Unpin + futures::Stream<Item = types::AppendInput>,
726    {
727        let (response_tx, response_rx) = mpsc::channel(10);
728        _ = tokio::spawn(append_session::manage_session(
729            self.clone(),
730            req,
731            response_tx,
732            self.inner.config.compression,
733        ));
734
735        Ok(Box::pin(ReceiverStream::new(response_rx)))
736    }
737}
738
739#[derive(Debug, Clone)]
740enum ClientKind {
741    Account,
742    Basin(types::BasinName),
743}
744
745impl ClientKind {
746    fn to_authority(&self, endpoints: &S2Endpoints) -> Authority {
747        match self {
748            ClientKind::Account => endpoints.account.clone(),
749            ClientKind::Basin(basin) => match &endpoints.basin {
750                BasinEndpoint::ParentZone(zone) => format!("{basin}.{zone}")
751                    .try_into()
752                    .expect("valid authority as basin pre-validated"),
753                BasinEndpoint::Direct(endpoint) => endpoint.clone(),
754            },
755        }
756    }
757}
758
759#[derive(Debug, Clone)]
760pub(crate) struct ClientInner {
761    kind: ClientKind,
762    channel: Channel,
763    pub(crate) config: ClientConfig,
764}
765
766impl ClientInner {
767    fn new<C>(kind: ClientKind, config: ClientConfig, connector: Option<C>) -> Self
768    where
769        C: tower_service::Service<http::Uri> + Send + 'static,
770        C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
771        C::Future: Send,
772        C::Error: std::error::Error + Send + Sync + 'static,
773    {
774        let authority = kind.to_authority(&config.endpoints);
775
776        #[cfg(not(feature = "connector"))]
777        let scheme = "https";
778        #[cfg(feature = "connector")]
779        let scheme = config.uri_scheme.as_str();
780
781        let endpoint = format!("{scheme}://{authority}")
782            .parse::<Endpoint>()
783            .expect("previously validated endpoint scheme and authority")
784            .user_agent(config.user_agent.clone())
785            .expect("converting HeaderValue into HeaderValue")
786            .http2_adaptive_window(true)
787            .tls_config(
788                ClientTlsConfig::default()
789                    .with_webpki_roots()
790                    .assume_http2(true),
791            )
792            .expect("valid TLS config")
793            .connect_timeout(config.connection_timeout)
794            .timeout(config.request_timeout);
795
796        let channel = if let Some(connector) = connector {
797            assert!(
798                matches!(&config.endpoints.basin, BasinEndpoint::Direct(a) if a == &config.endpoints.account),
799                "Connector only supported when connecting directly to a cell for account as well as basins"
800            );
801            endpoint.connect_with_connector_lazy(connector)
802        } else {
803            endpoint.connect_lazy()
804        };
805
806        Self {
807            kind,
808            channel,
809            config,
810        }
811    }
812
813    fn for_basin(&self, basin: types::BasinName) -> ClientInner {
814        let current_authority = self.kind.to_authority(&self.config.endpoints);
815        let new_kind = ClientKind::Basin(basin);
816        let new_authority = new_kind.to_authority(&self.config.endpoints);
817        if current_authority == new_authority {
818            Self {
819                kind: new_kind,
820                ..self.clone()
821            }
822        } else {
823            Self::new(new_kind, self.config.clone(), DEFAULT_CONNECTOR)
824        }
825    }
826
827    pub(crate) async fn send<T: ServiceRequest>(
828        &self,
829        service_req: T,
830    ) -> Result<T::Response, ClientError> {
831        let basin_header = match (&self.kind, &self.config.endpoints.basin) {
832            (ClientKind::Basin(basin), BasinEndpoint::Direct(_)) => {
833                Some(AsciiMetadataValue::from_str(basin).expect("valid"))
834            }
835            _ => None,
836        };
837        send_request(service_req, &self.config.token, basin_header).await
838    }
839
840    async fn send_retryable_with_backoff<T: ServiceRequest + Clone>(
841        &self,
842        service_req: T,
843        backoff_builder: impl BackoffBuilder,
844    ) -> Result<T::Response, ClientError> {
845        let retry_fn = || async { self.send(service_req.clone()).await };
846
847        retry_fn
848            .retry(backoff_builder)
849            .when(|e| service_req.should_retry(e))
850            .await
851    }
852
853    pub(crate) async fn send_retryable<T: ServiceRequest + Clone>(
854        &self,
855        service_req: T,
856    ) -> Result<T::Response, ClientError> {
857        self.send_retryable_with_backoff(service_req, self.backoff_builder())
858            .await
859    }
860
861    pub(crate) fn backoff_builder(&self) -> impl BackoffBuilder + use<> {
862        ConstantBuilder::default()
863            .with_delay(self.config.retry_backoff_duration)
864            .with_max_times(self.config.max_attempts)
865            .with_jitter()
866    }
867
868    fn account_service_client(&self) -> AccountServiceClient<Channel> {
869        AccountServiceClient::new(self.channel.clone())
870    }
871
872    fn basin_service_client(&self) -> BasinServiceClient<Channel> {
873        BasinServiceClient::new(self.channel.clone())
874    }
875
876    pub(crate) fn stream_service_client(&self) -> StreamServiceClient<Channel> {
877        StreamServiceClient::new(self.channel.clone())
878    }
879
880    pub(crate) fn frame_monitoring_stream_service_client(
881        &self,
882        frame_signal: FrameSignal,
883    ) -> StreamServiceClient<RequestFrameMonitor> {
884        StreamServiceClient::new(RequestFrameMonitor::new(self.channel.clone(), frame_signal))
885    }
886}
887
888fn read_resumption_stream(
889    mut request: ReadSessionServiceRequest,
890    mut responses: ServiceStreamingResponse<ReadSessionStreamingResponse>,
891    client: ClientInner,
892) -> impl Send + futures::Stream<Item = Result<types::ReadOutput, ClientError>> {
893    let mut backoff = None;
894    async_stream::stream! {
895        while let Some(item) = responses.next().await {
896            match item {
897                Err(e) if request.should_retry(&e) => {
898                    if backoff.is_none() {
899                        backoff = Some(client.backoff_builder().build());
900                    }
901                    if let Some(duration) = backoff.as_mut().and_then(|b| b.next()) {
902                        sleep(duration).await;
903                        if let Ok(new_responses) = client.send_retryable(request.clone()).await {
904                            responses = new_responses;
905                        } else {
906                            yield Err(e);
907                        }
908                    } else {
909                        yield Err(e);
910                    }
911                }
912                item => {
913                    if item.is_ok() {
914                        backoff = None;
915                    }
916                    if let Ok(types::ReadOutput::Batch(types::SequencedRecordBatch { records })) = &item {
917                        let req = request.req_mut();
918                        if let Some(record) = records.last() {
919                            req.start_seq_num = record.seq_num + 1;
920                        }
921                        if let Some(count) = req.limit.count.as_mut() {
922                            *count = count.saturating_sub(records.len() as u64);
923                        }
924                        if let Some(bytes) = req.limit.bytes.as_mut() {
925                            *bytes = bytes.saturating_sub(records.metered_bytes());
926                        }
927                    }
928                    yield item;
929                }
930            }
931        }
932    }
933}