s2/
client.rs

1//! SDK client implementation.
2//!
3//! The module defines three clients:
4//!
5//! |                  | Operations               | API Service      |
6//! |------------------|--------------------------|------------------|
7//! | [`Client`]       | Account level operations | [AccountService] |
8//! | [`BasinClient`]  | Basin level operations   | [BasinService]   |
9//! | [`StreamClient`] | Stream level operations  | [StreamService]  |
10//!
11//! To interact with any client, you need an authentication token which can be
12//! generated by the the web console at [s2.dev]. This token is passed to the
13//! client via a [`ClientConfig`].
14//!
15//! Along with the authentication token, a [`ClientConfig`] defines other
16//! request parameters such as timeouts, S2 endpoints, etc.
17//!
18//! A client can be created using the corresponding `new()` method. To avoid
19//! creating multiple connections to each service, [`Client::basin_client`] can be
20//! used to create a [`BasinClient`] and [`BasinClient::stream_client`] can be
21//! used to create a [`StreamClient`].
22//!
23//! **Note:** Even though the client creation operation is cheap, a
24//! [`BasinClient`] should preferably be stored instead of using
25//! [`Client::basin_client`] multiple times as the account endpoint might vary
26//! from the basin endpoint creating a new connection each time the request is
27//! sent. See [`S2Endpoints`].
28//!
29//! [AccountService]: https://s2.dev/docs/interface/grpc#accountservice
30//! [BasinService]: https://s2.dev/docs/interface/grpc#basinservice
31//! [StreamService]: https://s2.dev/docs/interface/grpc#streamservice
32//! [s2.dev]: https://s2.dev/dashboard
33
34use std::{env::VarError, fmt::Display, str::FromStr, time::Duration};
35
36use backon::{BackoffBuilder, ConstantBuilder, Retryable};
37use futures::StreamExt;
38use http::{HeaderValue, uri::Authority};
39use hyper_util::client::legacy::connect::HttpConnector;
40use secrecy::SecretString;
41use sync_docs::sync_docs;
42use tokio::{sync::mpsc, time::sleep};
43use tokio_stream::wrappers::ReceiverStream;
44use tonic::{
45    metadata::AsciiMetadataValue,
46    transport::{Channel, ClientTlsConfig, Endpoint},
47};
48use tonic_side_effect::{FrameSignal, RequestFrameMonitor};
49use tracing::warn;
50
51use crate::{
52    api::{
53        account_service_client::AccountServiceClient, basin_service_client::BasinServiceClient,
54        stream_service_client::StreamServiceClient,
55    },
56    append_session,
57    service::{
58        ServiceRequest, ServiceStreamingResponse, Streaming,
59        account::{
60            CreateBasinServiceRequest, DeleteBasinServiceRequest, GetBasinConfigServiceRequest,
61            IssueAccessTokenServiceRequest, ListAccessTokensServiceRequest,
62            ListBasinsServiceRequest, ReconfigureBasinServiceRequest,
63            RevokeAccessTokenServiceRequest,
64        },
65        basin::{
66            CreateStreamServiceRequest, DeleteStreamServiceRequest, GetStreamConfigServiceRequest,
67            ListStreamsServiceRequest, ReconfigureStreamServiceRequest,
68        },
69        send_request,
70        stream::{
71            AppendServiceRequest, CheckTailServiceRequest, ReadServiceRequest,
72            ReadSessionServiceRequest, ReadSessionStreamingResponse,
73        },
74    },
75    types::{
76        self, MIB_BYTES, MeteredBytes, RETRY_AFTER_MS_METADATA_KEY, ReadStart, StreamPosition,
77    },
78};
79
80const DEFAULT_CONNECTOR: Option<HttpConnector> = None;
81
82/// S2 cloud environment to connect with.
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub enum S2Cloud {
85    /// S2 running on AWS.
86    Aws,
87}
88
89impl S2Cloud {
90    const AWS: &'static str = "aws";
91
92    fn as_str(&self) -> &'static str {
93        match self {
94            Self::Aws => Self::AWS,
95        }
96    }
97}
98
99impl Display for S2Cloud {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        f.write_str(self.as_str())
102    }
103}
104
105impl FromStr for S2Cloud {
106    type Err = String;
107
108    fn from_str(s: &str) -> Result<Self, Self::Err> {
109        if s.eq_ignore_ascii_case(Self::AWS) {
110            Ok(Self::Aws)
111        } else {
112            Err(s.to_owned())
113        }
114    }
115}
116
117/// Endpoint for connecting to an S2 basin.
118#[derive(Debug, Clone)]
119pub enum BasinEndpoint {
120    /// Parent zone for basins.
121    /// DNS is used to route to the correct cell for the basin.
122    ParentZone(Authority),
123    /// Direct cell endpoint.
124    /// The `S2-Basin` header is included in requests to specify the basin,
125    /// which is expected to be hosted by this cell.
126    Direct(Authority),
127}
128
129/// Endpoints for the S2 environment.
130///
131/// You can find the S2 endpoints in our [documentation].
132///
133/// [documentation]: https://s2.dev/docs/interface/endpoints
134#[derive(Debug, Clone)]
135pub struct S2Endpoints {
136    /// Used by `AccountService` requests.
137    pub account: Authority,
138    /// Used by `BasinService` and `StreamService` requests.
139    pub basin: BasinEndpoint,
140}
141
142/// Retry policy for append requests.
143#[derive(Debug, Clone)]
144pub enum AppendRetryPolicy {
145    /// Retry all eligible failures encountered during an append.
146    ///
147    /// This could result in append batches being duplicated on the stream.
148    All,
149
150    /// Retry only failures with no side effects.
151    ///
152    /// Will not attempt to retry failures where it cannot be concluded whether
153    /// an append may become durable, in order to prevent duplicates.
154    NoSideEffects,
155}
156
157impl S2Endpoints {
158    /// Get S2 endpoints for the specified cloud.
159    pub fn for_cloud(cloud: S2Cloud) -> Self {
160        Self {
161            account: format!("{cloud}.s2.dev")
162                .try_into()
163                .expect("valid authority"),
164            basin: BasinEndpoint::ParentZone(
165                format!("b.{cloud}.s2.dev")
166                    .try_into()
167                    .expect("valid authority"),
168            ),
169        }
170    }
171
172    /// Get S2 endpoints for the specified cell.
173    pub fn for_cell(
174        cloud: S2Cloud,
175        cell_id: impl Into<String>,
176    ) -> Result<Self, http::uri::InvalidUri> {
177        let cell_endpoint: Authority = format!("{}.o.{cloud}.s2.dev", cell_id.into()).try_into()?;
178        Ok(Self {
179            account: cell_endpoint.clone(),
180            basin: BasinEndpoint::Direct(cell_endpoint),
181        })
182    }
183
184    /// Get S2 endpoints from environment variables.
185    ///
186    /// The following environment variables are used:
187    /// - `S2_CLOUD`: Valid S2 cloud name. Defaults to AWS.
188    /// - `S2_ACCOUNT_ENDPOINT`: Overrides the account endpoint.
189    /// - `S2_BASIN_ENDPOINT`: Overrides the basin endpoint. The prefix `"{basin}."` indicates the
190    ///   basin endpoint is `ParentZone` else `Direct`.
191    pub fn from_env() -> Result<Self, String> {
192        let cloud: S2Cloud = std::env::var("S2_CLOUD")
193            .ok()
194            .as_deref()
195            .unwrap_or(S2Cloud::AWS)
196            .parse()
197            .map_err(|cloud| format!("Invalid S2_CLOUD: {cloud}"))?;
198
199        let mut endpoints = Self::for_cloud(cloud);
200
201        match std::env::var("S2_ACCOUNT_ENDPOINT") {
202            Ok(spec) => {
203                endpoints.account = spec
204                    .as_str()
205                    .try_into()
206                    .map_err(|_| format!("Invalid S2_ACCOUNT_ENDPOINT: {spec}"))?;
207            }
208            Err(VarError::NotPresent) => {}
209            Err(VarError::NotUnicode(_)) => {
210                return Err("Invalid S2_ACCOUNT_ENDPOINT: not Unicode".to_owned());
211            }
212        }
213
214        match std::env::var("S2_BASIN_ENDPOINT") {
215            Ok(spec) => {
216                endpoints.basin = if let Some(parent_zone) = spec.strip_prefix("{basin}.") {
217                    BasinEndpoint::ParentZone(
218                        parent_zone
219                            .try_into()
220                            .map_err(|e| format!("Invalid S2_BASIN_ENDPOINT ({e}): {spec}"))?,
221                    )
222                } else {
223                    BasinEndpoint::Direct(
224                        spec.as_str()
225                            .try_into()
226                            .map_err(|e| format!("Invalid S2_BASIN_ENDPOINT ({e}): {spec}"))?,
227                    )
228                }
229            }
230            Err(VarError::NotPresent) => {}
231            Err(VarError::NotUnicode(_)) => {
232                return Err("Invalid S2_BASIN_ENDPOINT: not Unicode".to_owned());
233            }
234        }
235
236        Ok(endpoints)
237    }
238}
239
240/// Client configuration.
241#[derive(Debug, Clone)]
242pub struct ClientConfig {
243    pub(crate) token: SecretString,
244    pub(crate) endpoints: S2Endpoints,
245    pub(crate) connection_timeout: Duration,
246    pub(crate) request_timeout: Duration,
247    pub(crate) user_agent: HeaderValue,
248    pub(crate) append_retry_policy: AppendRetryPolicy,
249    #[cfg(feature = "connector")]
250    pub(crate) uri_scheme: http::uri::Scheme,
251    pub(crate) retry_backoff_duration: Duration,
252    pub(crate) max_attempts: usize,
253    pub(crate) max_append_inflight_bytes: u64,
254    pub(crate) compression: bool,
255}
256
257impl ClientConfig {
258    /// Initialize a default client configuration with the specified authentication token.
259    pub fn new(token: impl Into<String>) -> Self {
260        Self {
261            token: token.into().into(),
262            endpoints: S2Endpoints::for_cloud(S2Cloud::Aws),
263            connection_timeout: Duration::from_secs(3),
264            request_timeout: Duration::from_secs(5),
265            user_agent: "s2-sdk-rust".parse().expect("valid user agent"),
266            append_retry_policy: AppendRetryPolicy::All,
267            #[cfg(feature = "connector")]
268            uri_scheme: http::uri::Scheme::HTTPS,
269            retry_backoff_duration: Duration::from_millis(100),
270            max_attempts: 3,
271            max_append_inflight_bytes: 100 * MIB_BYTES,
272            compression: false,
273        }
274    }
275
276    /// S2 endpoints to connect to.
277    pub fn with_endpoints(self, host_endpoints: impl Into<S2Endpoints>) -> Self {
278        Self {
279            endpoints: host_endpoints.into(),
280            ..self
281        }
282    }
283
284    /// Timeout for connecting and transparently reconnecting. Defaults to 3s.
285    pub fn with_connection_timeout(self, connection_timeout: impl Into<Duration>) -> Self {
286        Self {
287            connection_timeout: connection_timeout.into(),
288            ..self
289        }
290    }
291
292    /// Timeout for a particular request. Defaults to 5s.
293    pub fn with_request_timeout(self, request_timeout: impl Into<Duration>) -> Self {
294        Self {
295            request_timeout: request_timeout.into(),
296            ..self
297        }
298    }
299
300    /// User agent. Defaults to `s2-sdk-rust`. Feel free to say hi.
301    pub fn with_user_agent(self, user_agent: HeaderValue) -> Self {
302        Self { user_agent, ..self }
303    }
304
305    /// Retry policy for appends.
306    /// Only relevant if `max_attempts > 1`.
307    ///
308    /// Defaults to retries of all failures, meaning duplicates on a stream are possible.
309    pub fn with_append_retry_policy(
310        self,
311        append_retry_policy: impl Into<AppendRetryPolicy>,
312    ) -> Self {
313        Self {
314            append_retry_policy: append_retry_policy.into(),
315            ..self
316        }
317    }
318
319    /// Maximum total size of currently inflight (pending acknowledgment) append
320    /// batches, per append session, as measured by `MeteredSize` formula.
321    ///
322    /// Must be at least 1 MiB. Defaults to 100 MiB.
323    pub fn with_max_append_inflight_bytes(self, max_append_inflight_bytes: u64) -> Self {
324        assert!(
325            max_append_inflight_bytes >= MIB_BYTES,
326            "max_append_inflight_bytes must be at least 1MiB"
327        );
328        Self {
329            max_append_inflight_bytes,
330            ..self
331        }
332    }
333
334    /// URI scheme to use when connecting with a custom connector. Defaults to `https`.
335    #[cfg(feature = "connector")]
336    pub fn with_uri_scheme(self, uri_scheme: impl Into<http::uri::Scheme>) -> Self {
337        Self {
338            uri_scheme: uri_scheme.into(),
339            ..self
340        }
341    }
342
343    /// Backoff duration when retrying.
344    /// Defaults to 100ms.
345    /// A jitter is always applied.
346    pub fn with_retry_backoff_duration(self, retry_backoff_duration: impl Into<Duration>) -> Self {
347        Self {
348            retry_backoff_duration: retry_backoff_duration.into(),
349            ..self
350        }
351    }
352
353    /// Maximum number of attempts per request.
354    /// Setting it to 1 disables retrying.
355    /// The default is to make 3 attempts.
356    pub fn with_max_attempts(self, max_attempts: usize) -> Self {
357        assert!(max_attempts > 0, "max attempts must be greater than 0");
358        Self {
359            max_attempts,
360            ..self
361        }
362    }
363
364    /// Configure compression for requests and responses.
365    /// Disabled by default.
366    pub fn with_compression(self, compression: bool) -> Self {
367        Self {
368            compression,
369            ..self
370        }
371    }
372}
373
374/// Error from client operations.
375#[derive(Debug, Clone, thiserror::Error)]
376pub enum ClientError {
377    /// SDK type conversion errors.
378    ///
379    /// Indicates an incompatibility between the SDK version and service.
380    #[error(transparent)]
381    Conversion(#[from] types::ConvertError),
382    /// Error status from service.
383    #[error(transparent)]
384    Service(#[from] tonic::Status),
385}
386
387/// Client for account-level operations.
388#[derive(Debug, Clone)]
389pub struct Client {
390    inner: ClientInner,
391}
392
393impl Client {
394    /// Create a new SDK client.
395    pub fn new(config: ClientConfig) -> Self {
396        Self {
397            inner: ClientInner::new(ClientKind::Account, config, DEFAULT_CONNECTOR),
398        }
399    }
400
401    /// Create a new SDK client using a custom connector.
402    #[cfg(feature = "connector")]
403    pub fn new_with_connector<C>(config: ClientConfig, connector: C) -> Self
404    where
405        C: tower_service::Service<http::Uri> + Send + 'static,
406        C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
407        C::Future: Send,
408        C::Error: std::error::Error + Send + Sync + 'static,
409    {
410        Self {
411            inner: ClientInner::new(ClientKind::Account, config, Some(connector)),
412        }
413    }
414
415    /// Create basin client from the given name.
416    pub fn basin_client(&self, basin: types::BasinName) -> BasinClient {
417        BasinClient {
418            inner: self.inner.for_basin(basin),
419        }
420    }
421
422    #[sync_docs]
423    pub async fn list_basins(
424        &self,
425        req: types::ListBasinsRequest,
426    ) -> Result<types::ListBasinsResponse, ClientError> {
427        self.inner
428            .send_retryable(ListBasinsServiceRequest::new(
429                self.inner.account_service_client(),
430                req,
431            ))
432            .await
433    }
434
435    #[sync_docs]
436    pub async fn create_basin(
437        &self,
438        req: types::CreateBasinRequest,
439    ) -> Result<types::BasinInfo, ClientError> {
440        self.inner
441            .send_retryable(CreateBasinServiceRequest::new(
442                self.inner.account_service_client(),
443                req,
444            ))
445            .await
446    }
447
448    #[sync_docs]
449    pub async fn delete_basin(&self, req: types::DeleteBasinRequest) -> Result<(), ClientError> {
450        self.inner
451            .send_retryable(DeleteBasinServiceRequest::new(
452                self.inner.account_service_client(),
453                req,
454            ))
455            .await
456    }
457
458    #[sync_docs]
459    pub async fn get_basin_config(
460        &self,
461        basin: types::BasinName,
462    ) -> Result<types::BasinConfig, ClientError> {
463        self.inner
464            .send_retryable(GetBasinConfigServiceRequest::new(
465                self.inner.account_service_client(),
466                basin,
467            ))
468            .await
469    }
470
471    #[sync_docs]
472    pub async fn reconfigure_basin(
473        &self,
474        req: types::ReconfigureBasinRequest,
475    ) -> Result<types::BasinConfig, ClientError> {
476        self.inner
477            .send_retryable(ReconfigureBasinServiceRequest::new(
478                self.inner.account_service_client(),
479                req,
480            ))
481            .await
482    }
483
484    #[sync_docs]
485    pub async fn issue_access_token(
486        &self,
487        info: types::AccessTokenInfo,
488    ) -> Result<String, ClientError> {
489        self.inner
490            .send_retryable(IssueAccessTokenServiceRequest::new(
491                self.inner.account_service_client(),
492                info,
493            ))
494            .await
495    }
496
497    #[sync_docs]
498    pub async fn revoke_access_token(
499        &self,
500        id: types::AccessTokenId,
501    ) -> Result<types::AccessTokenInfo, ClientError> {
502        self.inner
503            .send_retryable(RevokeAccessTokenServiceRequest::new(
504                self.inner.account_service_client(),
505                id,
506            ))
507            .await
508    }
509
510    #[sync_docs]
511    pub async fn list_access_tokens(
512        &self,
513        req: types::ListAccessTokensRequest,
514    ) -> Result<types::ListAccessTokensResponse, ClientError> {
515        self.inner
516            .send_retryable(ListAccessTokensServiceRequest::new(
517                self.inner.account_service_client(),
518                req,
519            ))
520            .await
521    }
522}
523
524/// Client for basin-level operations.
525#[derive(Debug, Clone)]
526pub struct BasinClient {
527    inner: ClientInner,
528}
529
530impl BasinClient {
531    /// Create a new basin client.
532    pub fn new(config: ClientConfig, basin: types::BasinName) -> Self {
533        Self {
534            inner: ClientInner::new(ClientKind::Basin(basin), config, DEFAULT_CONNECTOR),
535        }
536    }
537
538    /// Create a new basin client using a custom connector.
539    #[cfg(feature = "connector")]
540    pub fn new_with_connector<C>(
541        config: ClientConfig,
542        basin: types::BasinName,
543        connector: C,
544    ) -> Self
545    where
546        C: tower_service::Service<http::Uri> + Send + 'static,
547        C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
548        C::Future: Send,
549        C::Error: std::error::Error + Send + Sync + 'static,
550    {
551        Self {
552            inner: ClientInner::new(ClientKind::Basin(basin), config, Some(connector)),
553        }
554    }
555
556    /// Create a new client for stream-level operations.
557    pub fn stream_client(&self, stream: impl Into<String>) -> StreamClient {
558        StreamClient {
559            inner: self.inner.clone(),
560            stream: stream.into(),
561        }
562    }
563
564    #[sync_docs]
565    pub async fn create_stream(
566        &self,
567        req: types::CreateStreamRequest,
568    ) -> Result<types::StreamInfo, ClientError> {
569        self.inner
570            .send_retryable(CreateStreamServiceRequest::new(
571                self.inner.basin_service_client(),
572                req,
573            ))
574            .await
575    }
576
577    #[sync_docs]
578    pub async fn list_streams(
579        &self,
580        req: types::ListStreamsRequest,
581    ) -> Result<types::ListStreamsResponse, ClientError> {
582        self.inner
583            .send_retryable(ListStreamsServiceRequest::new(
584                self.inner.basin_service_client(),
585                req,
586            ))
587            .await
588    }
589
590    #[sync_docs]
591    pub async fn get_stream_config(
592        &self,
593        stream: impl Into<String>,
594    ) -> Result<types::StreamConfig, ClientError> {
595        self.inner
596            .send_retryable(GetStreamConfigServiceRequest::new(
597                self.inner.basin_service_client(),
598                stream,
599            ))
600            .await
601    }
602
603    #[sync_docs]
604    pub async fn reconfigure_stream(
605        &self,
606        req: types::ReconfigureStreamRequest,
607    ) -> Result<types::StreamConfig, ClientError> {
608        self.inner
609            .send(ReconfigureStreamServiceRequest::new(
610                self.inner.basin_service_client(),
611                req,
612            ))
613            .await
614    }
615
616    #[sync_docs]
617    pub async fn delete_stream(&self, req: types::DeleteStreamRequest) -> Result<(), ClientError> {
618        self.inner
619            .send_retryable(DeleteStreamServiceRequest::new(
620                self.inner.basin_service_client(),
621                req,
622            ))
623            .await
624    }
625}
626
627/// Client for stream-level operations.
628#[derive(Debug, Clone)]
629pub struct StreamClient {
630    pub(crate) inner: ClientInner,
631    pub(crate) stream: String,
632}
633
634impl StreamClient {
635    /// Create a new stream client.
636    pub fn new(config: ClientConfig, basin: types::BasinName, stream: impl Into<String>) -> Self {
637        BasinClient::new(config, basin).stream_client(stream)
638    }
639
640    /// Create a new stream client using a custom connector.
641    #[cfg(feature = "connector")]
642    pub fn new_with_connector<C>(
643        config: ClientConfig,
644        basin: types::BasinName,
645        stream: impl Into<String>,
646        connector: C,
647    ) -> Self
648    where
649        C: tower_service::Service<http::Uri> + Send + 'static,
650        C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
651        C::Future: Send,
652        C::Error: std::error::Error + Send + Sync + 'static,
653    {
654        BasinClient::new_with_connector(config, basin, connector).stream_client(stream)
655    }
656
657    #[sync_docs]
658    pub async fn check_tail(&self) -> Result<StreamPosition, ClientError> {
659        self.inner
660            .send_retryable(CheckTailServiceRequest::new(
661                self.inner.stream_service_client(),
662                &self.stream,
663            ))
664            .await
665    }
666
667    #[sync_docs]
668    pub async fn read(&self, req: types::ReadRequest) -> Result<types::ReadOutput, ClientError> {
669        self.inner
670            .send_retryable(ReadServiceRequest::new(
671                self.inner.stream_service_client(),
672                &self.stream,
673                req,
674                self.inner.config.compression,
675            ))
676            .await
677    }
678
679    #[sync_docs]
680    pub async fn read_session(
681        &self,
682        req: types::ReadSessionRequest,
683    ) -> Result<Streaming<types::ReadOutput>, ClientError> {
684        let request = ReadSessionServiceRequest::new(
685            self.inner.stream_service_client(),
686            &self.stream,
687            req,
688            self.inner.config.compression,
689        );
690        self.inner
691            .send_retryable(request.clone())
692            .await
693            .map(|responses| {
694                Box::pin(read_resumption_stream(
695                    request,
696                    responses,
697                    self.inner.clone(),
698                )) as _
699            })
700    }
701
702    #[sync_docs]
703    pub async fn append(&self, req: types::AppendInput) -> Result<types::AppendAck, ClientError> {
704        let frame_signal = FrameSignal::new();
705        self.inner
706            .send_retryable(AppendServiceRequest::new(
707                self.inner
708                    .frame_monitoring_stream_service_client(frame_signal.clone()),
709                self.inner.config.append_retry_policy.clone(),
710                frame_signal,
711                &self.stream,
712                req,
713                self.inner.config.compression,
714            ))
715            .await
716    }
717
718    #[sync_docs]
719    #[allow(clippy::unused_async)]
720    pub async fn append_session<S>(
721        &self,
722        req: S,
723    ) -> Result<Streaming<types::AppendAck>, ClientError>
724    where
725        S: 'static + Send + Unpin + futures::Stream<Item = types::AppendInput>,
726    {
727        let (response_tx, response_rx) = mpsc::channel(10);
728        _ = tokio::spawn(append_session::manage_session(
729            self.clone(),
730            req,
731            response_tx,
732            self.inner.config.compression,
733        ));
734
735        Ok(Box::pin(ReceiverStream::new(response_rx)))
736    }
737}
738
739#[derive(Debug, Clone)]
740enum ClientKind {
741    Account,
742    Basin(types::BasinName),
743}
744
745impl ClientKind {
746    fn to_authority(&self, endpoints: &S2Endpoints) -> Authority {
747        match self {
748            ClientKind::Account => endpoints.account.clone(),
749            ClientKind::Basin(basin) => match &endpoints.basin {
750                BasinEndpoint::ParentZone(zone) => format!("{basin}.{zone}")
751                    .try_into()
752                    .expect("valid authority as basin pre-validated"),
753                BasinEndpoint::Direct(endpoint) => endpoint.clone(),
754            },
755        }
756    }
757}
758
759#[derive(Debug, Clone)]
760pub(crate) struct ClientInner {
761    kind: ClientKind,
762    channel: Channel,
763    pub(crate) config: ClientConfig,
764}
765
766impl ClientInner {
767    fn new<C>(kind: ClientKind, config: ClientConfig, connector: Option<C>) -> Self
768    where
769        C: tower_service::Service<http::Uri> + Send + 'static,
770        C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
771        C::Future: Send,
772        C::Error: std::error::Error + Send + Sync + 'static,
773    {
774        let authority = kind.to_authority(&config.endpoints);
775
776        #[cfg(not(feature = "connector"))]
777        let scheme = "https";
778        #[cfg(feature = "connector")]
779        let scheme = config.uri_scheme.as_str();
780
781        let endpoint = format!("{scheme}://{authority}")
782            .parse::<Endpoint>()
783            .expect("previously validated endpoint scheme and authority")
784            .user_agent(config.user_agent.clone())
785            .expect("converting HeaderValue into HeaderValue")
786            .http2_adaptive_window(true)
787            .tls_config(
788                ClientTlsConfig::default()
789                    .with_webpki_roots()
790                    .assume_http2(true),
791            )
792            .expect("valid TLS config")
793            .connect_timeout(config.connection_timeout)
794            .timeout(config.request_timeout);
795
796        let channel = if let Some(connector) = connector {
797            assert!(
798                matches!(&config.endpoints.basin, BasinEndpoint::Direct(a) if a == &config.endpoints.account),
799                "Connector only supported when connecting directly to a cell for account as well as basins"
800            );
801            endpoint.connect_with_connector_lazy(connector)
802        } else {
803            endpoint.connect_lazy()
804        };
805
806        Self {
807            kind,
808            channel,
809            config,
810        }
811    }
812
813    fn for_basin(&self, basin: types::BasinName) -> ClientInner {
814        let current_authority = self.kind.to_authority(&self.config.endpoints);
815        let new_kind = ClientKind::Basin(basin);
816        let new_authority = new_kind.to_authority(&self.config.endpoints);
817        if current_authority == new_authority {
818            Self {
819                kind: new_kind,
820                ..self.clone()
821            }
822        } else {
823            Self::new(new_kind, self.config.clone(), DEFAULT_CONNECTOR)
824        }
825    }
826
827    pub(crate) async fn send<T: ServiceRequest>(
828        &self,
829        service_req: T,
830    ) -> Result<T::Response, ClientError> {
831        let basin_header = match (&self.kind, &self.config.endpoints.basin) {
832            (ClientKind::Basin(basin), BasinEndpoint::Direct(_)) => {
833                Some(AsciiMetadataValue::from_str(basin).expect("valid"))
834            }
835            _ => None,
836        };
837        send_request(service_req, &self.config.token, basin_header).await
838    }
839
840    async fn send_retryable_with_backoff<T: ServiceRequest + Clone>(
841        &self,
842        service_req: T,
843        backoff_builder: impl BackoffBuilder,
844    ) -> Result<T::Response, ClientError> {
845        let retry_fn = || async { self.send(service_req.clone()).await };
846
847        retry_fn
848            .retry(backoff_builder)
849            .when(|e| service_req.should_retry(e))
850            .adjust(|e, backoff_duration| match e {
851                ClientError::Service(s) => {
852                    if let Some(value) = s.metadata().get(RETRY_AFTER_MS_METADATA_KEY) {
853                        if let Some(retry_after_ms) = value
854                            .to_str()
855                            .ok()
856                            .map(|v| v.parse())
857                            .transpose()
858                            .ok()
859                            .flatten()
860                        {
861                            Some(Duration::from_millis(retry_after_ms))
862                        } else {
863                            warn!(
864                                "Failed to convert {RETRY_AFTER_MS_METADATA_KEY} metadata to u64.
865                                Falling back to default backoff duration: {:?}",
866                                backoff_duration
867                            );
868                            backoff_duration
869                        }
870                    } else {
871                        backoff_duration
872                    }
873                }
874                _ => backoff_duration,
875            })
876            .await
877    }
878
879    pub(crate) async fn send_retryable<T: ServiceRequest + Clone>(
880        &self,
881        service_req: T,
882    ) -> Result<T::Response, ClientError> {
883        self.send_retryable_with_backoff(service_req, self.backoff_builder())
884            .await
885    }
886
887    pub(crate) fn backoff_builder(&self) -> impl BackoffBuilder + use<> {
888        ConstantBuilder::default()
889            .with_delay(self.config.retry_backoff_duration)
890            .with_max_times(self.config.max_attempts)
891            .with_jitter()
892    }
893
894    fn account_service_client(&self) -> AccountServiceClient<Channel> {
895        AccountServiceClient::new(self.channel.clone())
896    }
897
898    fn basin_service_client(&self) -> BasinServiceClient<Channel> {
899        BasinServiceClient::new(self.channel.clone())
900    }
901
902    pub(crate) fn stream_service_client(&self) -> StreamServiceClient<Channel> {
903        StreamServiceClient::new(self.channel.clone())
904    }
905
906    pub(crate) fn frame_monitoring_stream_service_client(
907        &self,
908        frame_signal: FrameSignal,
909    ) -> StreamServiceClient<RequestFrameMonitor> {
910        StreamServiceClient::new(RequestFrameMonitor::new(self.channel.clone(), frame_signal))
911    }
912}
913
914fn read_resumption_stream(
915    mut request: ReadSessionServiceRequest,
916    mut responses: ServiceStreamingResponse<ReadSessionStreamingResponse>,
917    client: ClientInner,
918) -> impl Send + futures::Stream<Item = Result<types::ReadOutput, ClientError>> {
919    let mut backoff = None;
920    async_stream::stream! {
921        while let Some(item) = responses.next().await {
922            match item {
923                Err(e) if request.should_retry(&e) => {
924                    if backoff.is_none() {
925                        backoff = Some(client.backoff_builder().build());
926                    }
927                    if let Some(duration) = backoff.as_mut().and_then(|b| b.next()) {
928                        sleep(duration).await;
929                        if let Ok(new_responses) = client.send_retryable(request.clone()).await {
930                            responses = new_responses;
931                        } else {
932                            yield Err(e);
933                        }
934                    } else {
935                        yield Err(e);
936                    }
937                }
938                item => {
939                    if item.is_ok() {
940                        backoff = None;
941                    }
942                    if let Ok(types::ReadOutput::Batch(types::SequencedRecordBatch { records })) = &item {
943                        let req = request.req_mut();
944                        if let Some(record) = records.last() {
945                            req.start = ReadStart::SeqNum(record.seq_num + 1);
946                        }
947                        if let Some(count) = req.limit.count.as_mut() {
948                            *count = count.saturating_sub(records.len() as u64);
949                        }
950                        if let Some(bytes) = req.limit.bytes.as_mut() {
951                            *bytes = bytes.saturating_sub(records.metered_bytes());
952                        }
953                    }
954                    yield item;
955                }
956            }
957        }
958    }
959}