s2/
client.rs

1//! SDK client implementation.
2//!
3//! The module defines three clients:
4//!
5//! |                  | Operations               | API Service      |
6//! |------------------|--------------------------|------------------|
7//! | [`Client`]       | Account level operations | [AccountService] |
8//! | [`BasinClient`]  | Basin level operations   | [BasinService]   |
9//! | [`StreamClient`] | Stream level operations  | [StreamService]  |
10//!
11//! To interact with any client, you need an authentication token which can be
12//! generated by the the web console at [s2.dev]. This token is passed to the
13//! client via a [`ClientConfig`].
14//!
15//! Along with the authentication token, a [`ClientConfig`] defines other
16//! request parameters such as timeouts, S2 endpoints, etc.
17//!
18//! A client can be created using the corresponding `new()` method. To avoid
19//! creating multiple connections to each service, [`Client::basin_client`] can be
20//! used to create a [`BasinClient`] and [`BasinClient::stream_client`] can be
21//! used to create a [`StreamClient`].
22//!
23//! **Note:** Even though the client creation operation is cheap, a
24//! [`BasinClient`] should preferably be stored instead of using
25//! [`Client::basin_client`] multiple times as the account endpoint might vary
26//! from the basin endpoint creating a new connection each time the request is
27//! sent. See [`S2Endpoints`].
28//!
29//! [AccountService]: https://s2.dev/docs/interface/grpc#accountservice
30//! [BasinService]: https://s2.dev/docs/interface/grpc#basinservice
31//! [StreamService]: https://s2.dev/docs/interface/grpc#streamservice
32//! [s2.dev]: https://s2.dev/dashboard
33
34use std::{env::VarError, fmt::Display, str::FromStr, time::Duration};
35
36use backon::{BackoffBuilder, ConstantBuilder, Retryable};
37use futures::StreamExt;
38use http::{HeaderValue, uri::Authority};
39use hyper_util::client::legacy::connect::HttpConnector;
40use secrecy::SecretString;
41use sync_docs::sync_docs;
42use tokio::{sync::mpsc, time::sleep};
43use tokio_stream::wrappers::ReceiverStream;
44use tonic::{
45    metadata::AsciiMetadataValue,
46    transport::{Channel, ClientTlsConfig, Endpoint},
47};
48use tonic_side_effect::{FrameSignal, RequestFrameMonitor};
49
50use crate::{
51    api::{
52        account_service_client::AccountServiceClient, basin_service_client::BasinServiceClient,
53        stream_service_client::StreamServiceClient,
54    },
55    append_session,
56    service::{
57        ServiceRequest, ServiceStreamingResponse, Streaming,
58        account::{
59            CreateBasinServiceRequest, DeleteBasinServiceRequest, GetBasinConfigServiceRequest,
60            ListBasinsServiceRequest, ReconfigureBasinServiceRequest,
61        },
62        basin::{
63            CreateStreamServiceRequest, DeleteStreamServiceRequest, GetStreamConfigServiceRequest,
64            ListStreamsServiceRequest, ReconfigureStreamServiceRequest,
65        },
66        send_request,
67        stream::{
68            AppendServiceRequest, CheckTailServiceRequest, ReadServiceRequest,
69            ReadSessionServiceRequest, ReadSessionStreamingResponse,
70        },
71    },
72    types::{self, MIB_BYTES, MeteredBytes},
73};
74
75const DEFAULT_CONNECTOR: Option<HttpConnector> = None;
76
77/// S2 cloud environment to connect with.
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum S2Cloud {
80    /// S2 running on AWS.
81    Aws,
82}
83
84impl S2Cloud {
85    const AWS: &'static str = "aws";
86
87    fn as_str(&self) -> &'static str {
88        match self {
89            Self::Aws => Self::AWS,
90        }
91    }
92}
93
94impl Display for S2Cloud {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        f.write_str(self.as_str())
97    }
98}
99
100impl FromStr for S2Cloud {
101    type Err = String;
102
103    fn from_str(s: &str) -> Result<Self, Self::Err> {
104        if s.eq_ignore_ascii_case(Self::AWS) {
105            Ok(Self::Aws)
106        } else {
107            Err(s.to_owned())
108        }
109    }
110}
111
112/// Endpoint for connecting to an S2 basin.
113#[derive(Debug, Clone)]
114pub enum BasinEndpoint {
115    /// Parent zone for basins.
116    /// DNS is used to route to the correct cell for the basin.
117    ParentZone(Authority),
118    /// Direct cell endpoint.
119    /// The `S2-Basin` header is included in requests to specify the basin,
120    /// which is expected to be hosted by this cell.
121    Direct(Authority),
122}
123
124/// Endpoints for the S2 environment.
125///
126/// You can find the S2 endpoints in our [documentation].
127///
128/// [documentation]: https://s2.dev/docs/interface/endpoints
129#[derive(Debug, Clone)]
130pub struct S2Endpoints {
131    /// Used by `AccountService` requests.
132    pub account: Authority,
133    /// Used by `BasinService` and `StreamService` requests.
134    pub basin: BasinEndpoint,
135}
136
137/// Retry policy for append requests.
138#[derive(Debug, Clone)]
139pub enum AppendRetryPolicy {
140    /// Retry all eligible failures encountered during an append.
141    ///
142    /// This could result in append batches being duplicated on the stream.
143    All,
144
145    /// Retry only failures with no side effects.
146    ///
147    /// Will not attempt to retry failures where it cannot be concluded whether
148    /// an append may become durable, in order to prevent duplicates.
149    NoSideEffects,
150}
151
152impl S2Endpoints {
153    /// Get S2 endpoints for the specified cloud.
154    pub fn for_cloud(cloud: S2Cloud) -> Self {
155        Self {
156            account: format!("{cloud}.s2.dev")
157                .try_into()
158                .expect("valid authority"),
159            basin: BasinEndpoint::ParentZone(
160                format!("b.{cloud}.s2.dev")
161                    .try_into()
162                    .expect("valid authority"),
163            ),
164        }
165    }
166
167    /// Get S2 endpoints for the specified cell.
168    pub fn for_cell(
169        cloud: S2Cloud,
170        cell_id: impl Into<String>,
171    ) -> Result<Self, http::uri::InvalidUri> {
172        let cell_endpoint: Authority = format!("{}.o.{cloud}.s2.dev", cell_id.into()).try_into()?;
173        Ok(Self {
174            account: cell_endpoint.clone(),
175            basin: BasinEndpoint::Direct(cell_endpoint),
176        })
177    }
178
179    /// Get S2 endpoints from environment variables.
180    ///
181    /// The following environment variables are used:
182    /// - `S2_CLOUD`: Valid S2 cloud name. Defaults to AWS.
183    /// - `S2_ACCOUNT_ENDPOINT`: Overrides the account endpoint.
184    /// - `S2_BASIN_ENDPOINT`: Overrides the basin endpoint. The prefix `"{basin}."` indicates the
185    ///   basin endpoint is `ParentZone` else `Direct`.
186    pub fn from_env() -> Result<Self, String> {
187        let cloud: S2Cloud = std::env::var("S2_CLOUD")
188            .ok()
189            .as_deref()
190            .unwrap_or(S2Cloud::AWS)
191            .parse()
192            .map_err(|cloud| format!("Invalid S2_CLOUD: {cloud}"))?;
193
194        let mut endpoints = Self::for_cloud(cloud);
195
196        match std::env::var("S2_ACCOUNT_ENDPOINT") {
197            Ok(spec) => {
198                endpoints.account = spec
199                    .as_str()
200                    .try_into()
201                    .map_err(|_| format!("Invalid S2_ACCOUNT_ENDPOINT: {spec}"))?;
202            }
203            Err(VarError::NotPresent) => {}
204            Err(VarError::NotUnicode(_)) => {
205                return Err("Invalid S2_ACCOUNT_ENDPOINT: not Unicode".to_owned());
206            }
207        }
208
209        match std::env::var("S2_BASIN_ENDPOINT") {
210            Ok(spec) => {
211                endpoints.basin = if let Some(parent_zone) = spec.strip_prefix("{basin}.") {
212                    BasinEndpoint::ParentZone(
213                        parent_zone
214                            .try_into()
215                            .map_err(|e| format!("Invalid S2_BASIN_ENDPOINT ({e}): {spec}"))?,
216                    )
217                } else {
218                    BasinEndpoint::Direct(
219                        spec.as_str()
220                            .try_into()
221                            .map_err(|e| format!("Invalid S2_BASIN_ENDPOINT ({e}): {spec}"))?,
222                    )
223                }
224            }
225            Err(VarError::NotPresent) => {}
226            Err(VarError::NotUnicode(_)) => {
227                return Err("Invalid S2_BASIN_ENDPOINT: not Unicode".to_owned());
228            }
229        }
230
231        Ok(endpoints)
232    }
233}
234
235/// Client configuration.
236#[derive(Debug, Clone)]
237pub struct ClientConfig {
238    pub(crate) token: SecretString,
239    pub(crate) endpoints: S2Endpoints,
240    pub(crate) connection_timeout: Duration,
241    pub(crate) request_timeout: Duration,
242    pub(crate) user_agent: HeaderValue,
243    pub(crate) append_retry_policy: AppendRetryPolicy,
244    #[cfg(feature = "connector")]
245    pub(crate) uri_scheme: http::uri::Scheme,
246    pub(crate) retry_backoff_duration: Duration,
247    pub(crate) max_attempts: usize,
248    pub(crate) max_append_inflight_bytes: u64,
249    pub(crate) compression: bool,
250}
251
252impl ClientConfig {
253    /// Initialize a default client configuration with the specified authentication token.
254    pub fn new(token: impl Into<String>) -> Self {
255        Self {
256            token: token.into().into(),
257            endpoints: S2Endpoints::for_cloud(S2Cloud::Aws),
258            connection_timeout: Duration::from_secs(3),
259            request_timeout: Duration::from_secs(5),
260            user_agent: "s2-sdk-rust".parse().expect("valid user agent"),
261            append_retry_policy: AppendRetryPolicy::All,
262            #[cfg(feature = "connector")]
263            uri_scheme: http::uri::Scheme::HTTPS,
264            retry_backoff_duration: Duration::from_millis(100),
265            max_attempts: 3,
266            max_append_inflight_bytes: 100 * MIB_BYTES,
267            compression: false,
268        }
269    }
270
271    /// S2 endpoints to connect to.
272    pub fn with_endpoints(self, host_endpoints: impl Into<S2Endpoints>) -> Self {
273        Self {
274            endpoints: host_endpoints.into(),
275            ..self
276        }
277    }
278
279    /// Timeout for connecting and transparently reconnecting. Defaults to 3s.
280    pub fn with_connection_timeout(self, connection_timeout: impl Into<Duration>) -> Self {
281        Self {
282            connection_timeout: connection_timeout.into(),
283            ..self
284        }
285    }
286
287    /// Timeout for a particular request. Defaults to 5s.
288    pub fn with_request_timeout(self, request_timeout: impl Into<Duration>) -> Self {
289        Self {
290            request_timeout: request_timeout.into(),
291            ..self
292        }
293    }
294
295    /// User agent. Defaults to `s2-sdk-rust`. Feel free to say hi.
296    pub fn with_user_agent(self, user_agent: HeaderValue) -> Self {
297        Self { user_agent, ..self }
298    }
299
300    /// Retry policy for appends.
301    /// Only relevant if `max_attempts > 1`.
302    ///
303    /// Defaults to retries of all failures, meaning duplicates on a stream are possible.
304    pub fn with_append_retry_policy(
305        self,
306        append_retry_policy: impl Into<AppendRetryPolicy>,
307    ) -> Self {
308        Self {
309            append_retry_policy: append_retry_policy.into(),
310            ..self
311        }
312    }
313
314    /// Maximum total size of currently inflight (pending acknowledgment) append
315    /// batches, per append session, as measured by `MeteredSize` formula.
316    ///
317    /// Must be at least 1 MiB. Defaults to 100 MiB.
318    pub fn with_max_append_inflight_bytes(self, max_append_inflight_bytes: u64) -> Self {
319        assert!(
320            max_append_inflight_bytes >= MIB_BYTES,
321            "max_append_inflight_bytes must be at least 1MiB"
322        );
323        Self {
324            max_append_inflight_bytes,
325            ..self
326        }
327    }
328
329    /// URI scheme to use when connecting with a custom connector. Defaults to `https`.
330    #[cfg(feature = "connector")]
331    pub fn with_uri_scheme(self, uri_scheme: impl Into<http::uri::Scheme>) -> Self {
332        Self {
333            uri_scheme: uri_scheme.into(),
334            ..self
335        }
336    }
337
338    /// Backoff duration when retrying.
339    /// Defaults to 100ms.
340    /// A jitter is always applied.
341    pub fn with_retry_backoff_duration(self, retry_backoff_duration: impl Into<Duration>) -> Self {
342        Self {
343            retry_backoff_duration: retry_backoff_duration.into(),
344            ..self
345        }
346    }
347
348    /// Maximum number of attempts per request.
349    /// Setting it to 1 disables retrying.
350    /// The default is to make 3 attempts.
351    pub fn with_max_attempts(self, max_attempts: usize) -> Self {
352        assert!(max_attempts > 0, "max attempts must be greater than 0");
353        Self {
354            max_attempts,
355            ..self
356        }
357    }
358
359    /// Configure compression for requests and responses.
360    /// Disabled by default.
361    pub fn with_compression(self, compression: bool) -> Self {
362        Self {
363            compression,
364            ..self
365        }
366    }
367}
368
369/// Error from client operations.
370#[derive(Debug, Clone, thiserror::Error)]
371pub enum ClientError {
372    /// SDK type conversion errors.
373    ///
374    /// Indicates an incompatibility between the SDK version and service.
375    #[error(transparent)]
376    Conversion(#[from] types::ConvertError),
377    /// Error status from service.
378    #[error(transparent)]
379    Service(#[from] tonic::Status),
380}
381
382/// Client for account-level operations.
383#[derive(Debug, Clone)]
384pub struct Client {
385    inner: ClientInner,
386}
387
388impl Client {
389    /// Create a new SDK client.
390    pub fn new(config: ClientConfig) -> Self {
391        Self {
392            inner: ClientInner::new(ClientKind::Account, config, DEFAULT_CONNECTOR),
393        }
394    }
395
396    /// Create a new SDK client using a custom connector.
397    #[cfg(feature = "connector")]
398    pub fn new_with_connector<C>(config: ClientConfig, connector: C) -> Self
399    where
400        C: tower_service::Service<http::Uri> + Send + 'static,
401        C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
402        C::Future: Send,
403        C::Error: std::error::Error + Send + Sync + 'static,
404    {
405        Self {
406            inner: ClientInner::new(ClientKind::Account, config, Some(connector)),
407        }
408    }
409
410    /// Create basin client from the given name.
411    pub fn basin_client(&self, basin: types::BasinName) -> BasinClient {
412        BasinClient {
413            inner: self.inner.for_basin(basin),
414        }
415    }
416
417    #[sync_docs]
418    pub async fn list_basins(
419        &self,
420        req: types::ListBasinsRequest,
421    ) -> Result<types::ListBasinsResponse, ClientError> {
422        self.inner
423            .send_retryable(ListBasinsServiceRequest::new(
424                self.inner.account_service_client(),
425                req,
426            ))
427            .await
428    }
429
430    #[sync_docs]
431    pub async fn create_basin(
432        &self,
433        req: types::CreateBasinRequest,
434    ) -> Result<types::BasinInfo, ClientError> {
435        self.inner
436            .send_retryable(CreateBasinServiceRequest::new(
437                self.inner.account_service_client(),
438                req,
439            ))
440            .await
441    }
442
443    #[sync_docs]
444    pub async fn delete_basin(&self, req: types::DeleteBasinRequest) -> Result<(), ClientError> {
445        self.inner
446            .send_retryable(DeleteBasinServiceRequest::new(
447                self.inner.account_service_client(),
448                req,
449            ))
450            .await
451    }
452
453    #[sync_docs]
454    pub async fn get_basin_config(
455        &self,
456        basin: types::BasinName,
457    ) -> Result<types::BasinConfig, ClientError> {
458        self.inner
459            .send_retryable(GetBasinConfigServiceRequest::new(
460                self.inner.account_service_client(),
461                basin,
462            ))
463            .await
464    }
465
466    #[sync_docs]
467    pub async fn reconfigure_basin(
468        &self,
469        req: types::ReconfigureBasinRequest,
470    ) -> Result<types::BasinConfig, ClientError> {
471        self.inner
472            .send_retryable(ReconfigureBasinServiceRequest::new(
473                self.inner.account_service_client(),
474                req,
475            ))
476            .await
477    }
478}
479
480/// Client for basin-level operations.
481#[derive(Debug, Clone)]
482pub struct BasinClient {
483    inner: ClientInner,
484}
485
486impl BasinClient {
487    /// Create a new basin client.
488    pub fn new(config: ClientConfig, basin: types::BasinName) -> Self {
489        Self {
490            inner: ClientInner::new(ClientKind::Basin(basin), config, DEFAULT_CONNECTOR),
491        }
492    }
493
494    /// Create a new basin client using a custom connector.
495    #[cfg(feature = "connector")]
496    pub fn new_with_connector<C>(
497        config: ClientConfig,
498        basin: types::BasinName,
499        connector: C,
500    ) -> Self
501    where
502        C: tower_service::Service<http::Uri> + Send + 'static,
503        C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
504        C::Future: Send,
505        C::Error: std::error::Error + Send + Sync + 'static,
506    {
507        Self {
508            inner: ClientInner::new(ClientKind::Basin(basin), config, Some(connector)),
509        }
510    }
511
512    /// Create a new client for stream-level operations.
513    pub fn stream_client(&self, stream: impl Into<String>) -> StreamClient {
514        StreamClient {
515            inner: self.inner.clone(),
516            stream: stream.into(),
517        }
518    }
519
520    #[sync_docs]
521    pub async fn create_stream(
522        &self,
523        req: types::CreateStreamRequest,
524    ) -> Result<types::StreamInfo, ClientError> {
525        self.inner
526            .send_retryable(CreateStreamServiceRequest::new(
527                self.inner.basin_service_client(),
528                req,
529            ))
530            .await
531    }
532
533    #[sync_docs]
534    pub async fn list_streams(
535        &self,
536        req: types::ListStreamsRequest,
537    ) -> Result<types::ListStreamsResponse, ClientError> {
538        self.inner
539            .send_retryable(ListStreamsServiceRequest::new(
540                self.inner.basin_service_client(),
541                req,
542            ))
543            .await
544    }
545
546    #[sync_docs]
547    pub async fn get_stream_config(
548        &self,
549        stream: impl Into<String>,
550    ) -> Result<types::StreamConfig, ClientError> {
551        self.inner
552            .send_retryable(GetStreamConfigServiceRequest::new(
553                self.inner.basin_service_client(),
554                stream,
555            ))
556            .await
557    }
558
559    #[sync_docs]
560    pub async fn reconfigure_stream(
561        &self,
562        req: types::ReconfigureStreamRequest,
563    ) -> Result<types::StreamConfig, ClientError> {
564        self.inner
565            .send(ReconfigureStreamServiceRequest::new(
566                self.inner.basin_service_client(),
567                req,
568            ))
569            .await
570    }
571
572    #[sync_docs]
573    pub async fn delete_stream(&self, req: types::DeleteStreamRequest) -> Result<(), ClientError> {
574        self.inner
575            .send_retryable(DeleteStreamServiceRequest::new(
576                self.inner.basin_service_client(),
577                req,
578            ))
579            .await
580    }
581}
582
583/// Client for stream-level operations.
584#[derive(Debug, Clone)]
585pub struct StreamClient {
586    pub(crate) inner: ClientInner,
587    pub(crate) stream: String,
588}
589
590impl StreamClient {
591    /// Create a new stream client.
592    pub fn new(config: ClientConfig, basin: types::BasinName, stream: impl Into<String>) -> Self {
593        BasinClient::new(config, basin).stream_client(stream)
594    }
595
596    /// Create a new stream client using a custom connector.
597    #[cfg(feature = "connector")]
598    pub fn new_with_connector<C>(
599        config: ClientConfig,
600        basin: types::BasinName,
601        stream: impl Into<String>,
602        connector: C,
603    ) -> Self
604    where
605        C: tower_service::Service<http::Uri> + Send + 'static,
606        C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
607        C::Future: Send,
608        C::Error: std::error::Error + Send + Sync + 'static,
609    {
610        BasinClient::new_with_connector(config, basin, connector).stream_client(stream)
611    }
612
613    #[sync_docs]
614    pub async fn check_tail(&self) -> Result<u64, ClientError> {
615        self.inner
616            .send_retryable(CheckTailServiceRequest::new(
617                self.inner.stream_service_client(),
618                &self.stream,
619            ))
620            .await
621    }
622
623    #[sync_docs]
624    pub async fn read(&self, req: types::ReadRequest) -> Result<types::ReadOutput, ClientError> {
625        self.inner
626            .send_retryable(ReadServiceRequest::new(
627                self.inner.stream_service_client(),
628                &self.stream,
629                req,
630                self.inner.config.compression,
631            ))
632            .await
633    }
634
635    #[sync_docs]
636    pub async fn read_session(
637        &self,
638        req: types::ReadSessionRequest,
639    ) -> Result<Streaming<types::ReadOutput>, ClientError> {
640        let request = ReadSessionServiceRequest::new(
641            self.inner.stream_service_client(),
642            &self.stream,
643            req,
644            self.inner.config.compression,
645        );
646        self.inner
647            .send_retryable(request.clone())
648            .await
649            .map(|responses| {
650                Box::pin(read_resumption_stream(
651                    request,
652                    responses,
653                    self.inner.clone(),
654                )) as _
655            })
656    }
657
658    #[sync_docs]
659    pub async fn append(
660        &self,
661        req: types::AppendInput,
662    ) -> Result<types::AppendOutput, ClientError> {
663        let frame_signal = FrameSignal::new();
664        self.inner
665            .send_retryable(AppendServiceRequest::new(
666                self.inner
667                    .frame_monitoring_stream_service_client(frame_signal.clone()),
668                self.inner.config.append_retry_policy.clone(),
669                frame_signal,
670                &self.stream,
671                req,
672                self.inner.config.compression,
673            ))
674            .await
675    }
676
677    #[sync_docs]
678    #[allow(clippy::unused_async)]
679    pub async fn append_session<S>(
680        &self,
681        req: S,
682    ) -> Result<Streaming<types::AppendOutput>, ClientError>
683    where
684        S: 'static + Send + Unpin + futures::Stream<Item = types::AppendInput>,
685    {
686        let (response_tx, response_rx) = mpsc::channel(10);
687        _ = tokio::spawn(append_session::manage_session(
688            self.clone(),
689            req,
690            response_tx,
691            self.inner.config.compression,
692        ));
693
694        Ok(Box::pin(ReceiverStream::new(response_rx)))
695    }
696}
697
698#[derive(Debug, Clone)]
699enum ClientKind {
700    Account,
701    Basin(types::BasinName),
702}
703
704impl ClientKind {
705    fn to_authority(&self, endpoints: &S2Endpoints) -> Authority {
706        match self {
707            ClientKind::Account => endpoints.account.clone(),
708            ClientKind::Basin(basin) => match &endpoints.basin {
709                BasinEndpoint::ParentZone(zone) => format!("{basin}.{zone}")
710                    .try_into()
711                    .expect("valid authority as basin pre-validated"),
712                BasinEndpoint::Direct(endpoint) => endpoint.clone(),
713            },
714        }
715    }
716}
717
718#[derive(Debug, Clone)]
719pub(crate) struct ClientInner {
720    kind: ClientKind,
721    channel: Channel,
722    pub(crate) config: ClientConfig,
723}
724
725impl ClientInner {
726    fn new<C>(kind: ClientKind, config: ClientConfig, connector: Option<C>) -> Self
727    where
728        C: tower_service::Service<http::Uri> + Send + 'static,
729        C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
730        C::Future: Send,
731        C::Error: std::error::Error + Send + Sync + 'static,
732    {
733        let authority = kind.to_authority(&config.endpoints);
734
735        #[cfg(not(feature = "connector"))]
736        let scheme = "https";
737        #[cfg(feature = "connector")]
738        let scheme = config.uri_scheme.as_str();
739
740        let endpoint = format!("{scheme}://{authority}")
741            .parse::<Endpoint>()
742            .expect("previously validated endpoint scheme and authority")
743            .user_agent(config.user_agent.clone())
744            .expect("converting HeaderValue into HeaderValue")
745            .http2_adaptive_window(true)
746            .tls_config(
747                ClientTlsConfig::default()
748                    .with_webpki_roots()
749                    .assume_http2(true),
750            )
751            .expect("valid TLS config")
752            .connect_timeout(config.connection_timeout)
753            .timeout(config.request_timeout);
754
755        let channel = if let Some(connector) = connector {
756            assert!(
757                matches!(&config.endpoints.basin, BasinEndpoint::Direct(a) if a == &config.endpoints.account),
758                "Connector only supported when connecting directly to a cell for account as well as basins"
759            );
760            endpoint.connect_with_connector_lazy(connector)
761        } else {
762            endpoint.connect_lazy()
763        };
764
765        Self {
766            kind,
767            channel,
768            config,
769        }
770    }
771
772    fn for_basin(&self, basin: types::BasinName) -> ClientInner {
773        let current_authority = self.kind.to_authority(&self.config.endpoints);
774        let new_kind = ClientKind::Basin(basin);
775        let new_authority = new_kind.to_authority(&self.config.endpoints);
776        if current_authority == new_authority {
777            Self {
778                kind: new_kind,
779                ..self.clone()
780            }
781        } else {
782            Self::new(new_kind, self.config.clone(), DEFAULT_CONNECTOR)
783        }
784    }
785
786    pub(crate) async fn send<T: ServiceRequest>(
787        &self,
788        service_req: T,
789    ) -> Result<T::Response, ClientError> {
790        let basin_header = match (&self.kind, &self.config.endpoints.basin) {
791            (ClientKind::Basin(basin), BasinEndpoint::Direct(_)) => {
792                Some(AsciiMetadataValue::from_str(basin).expect("valid"))
793            }
794            _ => None,
795        };
796        send_request(service_req, &self.config.token, basin_header).await
797    }
798
799    async fn send_retryable_with_backoff<T: ServiceRequest + Clone>(
800        &self,
801        service_req: T,
802        backoff_builder: impl BackoffBuilder,
803    ) -> Result<T::Response, ClientError> {
804        let retry_fn = || async { self.send(service_req.clone()).await };
805
806        retry_fn
807            .retry(backoff_builder)
808            .when(|e| service_req.should_retry(e))
809            .await
810    }
811
812    pub(crate) async fn send_retryable<T: ServiceRequest + Clone>(
813        &self,
814        service_req: T,
815    ) -> Result<T::Response, ClientError> {
816        self.send_retryable_with_backoff(service_req, self.backoff_builder())
817            .await
818    }
819
820    pub(crate) fn backoff_builder(&self) -> impl BackoffBuilder + use<> {
821        ConstantBuilder::default()
822            .with_delay(self.config.retry_backoff_duration)
823            .with_max_times(self.config.max_attempts)
824            .with_jitter()
825    }
826
827    fn account_service_client(&self) -> AccountServiceClient<Channel> {
828        AccountServiceClient::new(self.channel.clone())
829    }
830
831    fn basin_service_client(&self) -> BasinServiceClient<Channel> {
832        BasinServiceClient::new(self.channel.clone())
833    }
834
835    pub(crate) fn stream_service_client(&self) -> StreamServiceClient<Channel> {
836        StreamServiceClient::new(self.channel.clone())
837    }
838
839    pub(crate) fn frame_monitoring_stream_service_client(
840        &self,
841        frame_signal: FrameSignal,
842    ) -> StreamServiceClient<RequestFrameMonitor> {
843        StreamServiceClient::new(RequestFrameMonitor::new(self.channel.clone(), frame_signal))
844    }
845}
846
847fn read_resumption_stream(
848    mut request: ReadSessionServiceRequest,
849    mut responses: ServiceStreamingResponse<ReadSessionStreamingResponse>,
850    client: ClientInner,
851) -> impl Send + futures::Stream<Item = Result<types::ReadOutput, ClientError>> {
852    let mut backoff = None;
853    async_stream::stream! {
854        while let Some(item) = responses.next().await {
855            match item {
856                Err(e) if request.should_retry(&e) => {
857                    if backoff.is_none() {
858                        backoff = Some(client.backoff_builder().build());
859                    }
860                    if let Some(duration) = backoff.as_mut().and_then(|b| b.next()) {
861                        sleep(duration).await;
862                        if let Ok(new_responses) = client.send_retryable(request.clone()).await {
863                            responses = new_responses;
864                        } else {
865                            yield Err(e);
866                        }
867                    } else {
868                        yield Err(e);
869                    }
870                }
871                item => {
872                    if item.is_ok() {
873                        backoff = None;
874                    }
875                    if let Ok(types::ReadOutput::Batch(types::SequencedRecordBatch { records })) = &item {
876                        let req = request.req_mut();
877                        if let Some(record) = records.last() {
878                            req.start_seq_num = record.seq_num + 1;
879                        }
880                        if let Some(count) = req.limit.count.as_mut() {
881                            *count = count.saturating_sub(records.len() as u64);
882                        }
883                        if let Some(bytes) = req.limit.bytes.as_mut() {
884                            *bytes = bytes.saturating_sub(records.metered_bytes());
885                        }
886                    }
887                    yield item;
888                }
889            }
890        }
891    }
892}