1use std::{env::VarError, fmt::Display, str::FromStr, time::Duration};
35
36use backon::{BackoffBuilder, ConstantBuilder, Retryable};
37use futures::StreamExt;
38use http::{HeaderValue, uri::Authority};
39use hyper_util::client::legacy::connect::HttpConnector;
40use secrecy::SecretString;
41use sync_docs::sync_docs;
42use tokio::{sync::mpsc, time::sleep};
43use tokio_stream::wrappers::ReceiverStream;
44use tonic::{
45 metadata::AsciiMetadataValue,
46 transport::{Channel, ClientTlsConfig, Endpoint},
47};
48use tonic_side_effect::{FrameSignal, RequestFrameMonitor};
49use tracing::warn;
50
51use crate::{
52 api::{
53 account_service_client::AccountServiceClient, basin_service_client::BasinServiceClient,
54 stream_service_client::StreamServiceClient,
55 },
56 append_session,
57 service::{
58 ServiceRequest, ServiceStreamingResponse, Streaming,
59 account::{
60 CreateBasinServiceRequest, DeleteBasinServiceRequest, GetBasinConfigServiceRequest,
61 IssueAccessTokenServiceRequest, ListAccessTokensServiceRequest,
62 ListBasinsServiceRequest, ReconfigureBasinServiceRequest,
63 RevokeAccessTokenServiceRequest,
64 },
65 basin::{
66 CreateStreamServiceRequest, DeleteStreamServiceRequest, GetStreamConfigServiceRequest,
67 ListStreamsServiceRequest, ReconfigureStreamServiceRequest,
68 },
69 send_request,
70 stream::{
71 AppendServiceRequest, CheckTailServiceRequest, ReadServiceRequest,
72 ReadSessionServiceRequest, ReadSessionStreamingResponse,
73 },
74 },
75 types::{
76 self, MIB_BYTES, MeteredBytes, RETRY_AFTER_MS_METADATA_KEY, ReadStart, StreamPosition,
77 },
78};
79
80const DEFAULT_CONNECTOR: Option<HttpConnector> = None;
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub enum S2Cloud {
85 Aws,
87}
88
89impl S2Cloud {
90 const AWS: &'static str = "aws";
91
92 fn as_str(&self) -> &'static str {
93 match self {
94 Self::Aws => Self::AWS,
95 }
96 }
97}
98
99impl Display for S2Cloud {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 f.write_str(self.as_str())
102 }
103}
104
105impl FromStr for S2Cloud {
106 type Err = String;
107
108 fn from_str(s: &str) -> Result<Self, Self::Err> {
109 if s.eq_ignore_ascii_case(Self::AWS) {
110 Ok(Self::Aws)
111 } else {
112 Err(s.to_owned())
113 }
114 }
115}
116
117#[derive(Debug, Clone)]
119pub enum BasinEndpoint {
120 ParentZone(Authority),
123 Direct(Authority),
127}
128
129#[derive(Debug, Clone)]
135pub struct S2Endpoints {
136 pub account: Authority,
138 pub basin: BasinEndpoint,
140}
141
142#[derive(Debug, Clone)]
144pub enum AppendRetryPolicy {
145 All,
149
150 NoSideEffects,
155}
156
157impl S2Endpoints {
158 pub fn for_cloud(cloud: S2Cloud) -> Self {
160 Self {
161 account: format!("{cloud}.s2.dev")
162 .try_into()
163 .expect("valid authority"),
164 basin: BasinEndpoint::ParentZone(
165 format!("b.{cloud}.s2.dev")
166 .try_into()
167 .expect("valid authority"),
168 ),
169 }
170 }
171
172 pub fn for_cell(
174 cloud: S2Cloud,
175 cell_id: impl Into<String>,
176 ) -> Result<Self, http::uri::InvalidUri> {
177 let cell_endpoint: Authority = format!("{}.o.{cloud}.s2.dev", cell_id.into()).try_into()?;
178 Ok(Self {
179 account: cell_endpoint.clone(),
180 basin: BasinEndpoint::Direct(cell_endpoint),
181 })
182 }
183
184 pub fn from_env() -> Result<Self, String> {
192 let cloud: S2Cloud = std::env::var("S2_CLOUD")
193 .ok()
194 .as_deref()
195 .unwrap_or(S2Cloud::AWS)
196 .parse()
197 .map_err(|cloud| format!("Invalid S2_CLOUD: {cloud}"))?;
198
199 let mut endpoints = Self::for_cloud(cloud);
200
201 match std::env::var("S2_ACCOUNT_ENDPOINT") {
202 Ok(spec) => {
203 endpoints.account = spec
204 .as_str()
205 .try_into()
206 .map_err(|_| format!("Invalid S2_ACCOUNT_ENDPOINT: {spec}"))?;
207 }
208 Err(VarError::NotPresent) => {}
209 Err(VarError::NotUnicode(_)) => {
210 return Err("Invalid S2_ACCOUNT_ENDPOINT: not Unicode".to_owned());
211 }
212 }
213
214 match std::env::var("S2_BASIN_ENDPOINT") {
215 Ok(spec) => {
216 endpoints.basin = if let Some(parent_zone) = spec.strip_prefix("{basin}.") {
217 BasinEndpoint::ParentZone(
218 parent_zone
219 .try_into()
220 .map_err(|e| format!("Invalid S2_BASIN_ENDPOINT ({e}): {spec}"))?,
221 )
222 } else {
223 BasinEndpoint::Direct(
224 spec.as_str()
225 .try_into()
226 .map_err(|e| format!("Invalid S2_BASIN_ENDPOINT ({e}): {spec}"))?,
227 )
228 }
229 }
230 Err(VarError::NotPresent) => {}
231 Err(VarError::NotUnicode(_)) => {
232 return Err("Invalid S2_BASIN_ENDPOINT: not Unicode".to_owned());
233 }
234 }
235
236 Ok(endpoints)
237 }
238}
239
240#[derive(Debug, Clone)]
242pub struct ClientConfig {
243 pub(crate) token: SecretString,
244 pub(crate) endpoints: S2Endpoints,
245 pub(crate) connection_timeout: Duration,
246 pub(crate) request_timeout: Duration,
247 pub(crate) user_agent: HeaderValue,
248 pub(crate) append_retry_policy: AppendRetryPolicy,
249 #[cfg(feature = "connector")]
250 pub(crate) uri_scheme: http::uri::Scheme,
251 pub(crate) retry_backoff_duration: Duration,
252 pub(crate) max_attempts: usize,
253 pub(crate) max_append_inflight_bytes: u64,
254 pub(crate) compression: bool,
255}
256
257impl ClientConfig {
258 pub fn new(token: impl Into<String>) -> Self {
260 Self {
261 token: token.into().into(),
262 endpoints: S2Endpoints::for_cloud(S2Cloud::Aws),
263 connection_timeout: Duration::from_secs(3),
264 request_timeout: Duration::from_secs(5),
265 user_agent: "s2-sdk-rust".parse().expect("valid user agent"),
266 append_retry_policy: AppendRetryPolicy::All,
267 #[cfg(feature = "connector")]
268 uri_scheme: http::uri::Scheme::HTTPS,
269 retry_backoff_duration: Duration::from_millis(100),
270 max_attempts: 3,
271 max_append_inflight_bytes: 100 * MIB_BYTES,
272 compression: false,
273 }
274 }
275
276 pub fn with_endpoints(self, host_endpoints: impl Into<S2Endpoints>) -> Self {
278 Self {
279 endpoints: host_endpoints.into(),
280 ..self
281 }
282 }
283
284 pub fn with_connection_timeout(self, connection_timeout: impl Into<Duration>) -> Self {
286 Self {
287 connection_timeout: connection_timeout.into(),
288 ..self
289 }
290 }
291
292 pub fn with_request_timeout(self, request_timeout: impl Into<Duration>) -> Self {
294 Self {
295 request_timeout: request_timeout.into(),
296 ..self
297 }
298 }
299
300 pub fn with_user_agent(self, user_agent: HeaderValue) -> Self {
302 Self { user_agent, ..self }
303 }
304
305 pub fn with_append_retry_policy(
310 self,
311 append_retry_policy: impl Into<AppendRetryPolicy>,
312 ) -> Self {
313 Self {
314 append_retry_policy: append_retry_policy.into(),
315 ..self
316 }
317 }
318
319 pub fn with_max_append_inflight_bytes(self, max_append_inflight_bytes: u64) -> Self {
324 assert!(
325 max_append_inflight_bytes >= MIB_BYTES,
326 "max_append_inflight_bytes must be at least 1MiB"
327 );
328 Self {
329 max_append_inflight_bytes,
330 ..self
331 }
332 }
333
334 #[cfg(feature = "connector")]
336 pub fn with_uri_scheme(self, uri_scheme: impl Into<http::uri::Scheme>) -> Self {
337 Self {
338 uri_scheme: uri_scheme.into(),
339 ..self
340 }
341 }
342
343 pub fn with_retry_backoff_duration(self, retry_backoff_duration: impl Into<Duration>) -> Self {
347 Self {
348 retry_backoff_duration: retry_backoff_duration.into(),
349 ..self
350 }
351 }
352
353 pub fn with_max_attempts(self, max_attempts: usize) -> Self {
357 assert!(max_attempts > 0, "max attempts must be greater than 0");
358 Self {
359 max_attempts,
360 ..self
361 }
362 }
363
364 pub fn with_compression(self, compression: bool) -> Self {
367 Self {
368 compression,
369 ..self
370 }
371 }
372}
373
374#[derive(Debug, Clone, thiserror::Error)]
376pub enum ClientError {
377 #[error(transparent)]
381 Conversion(#[from] types::ConvertError),
382 #[error(transparent)]
384 Service(#[from] tonic::Status),
385}
386
387#[derive(Debug, Clone)]
389pub struct Client {
390 inner: ClientInner,
391}
392
393impl Client {
394 pub fn new(config: ClientConfig) -> Self {
396 Self {
397 inner: ClientInner::new(ClientKind::Account, config, DEFAULT_CONNECTOR),
398 }
399 }
400
401 #[cfg(feature = "connector")]
403 pub fn new_with_connector<C>(config: ClientConfig, connector: C) -> Self
404 where
405 C: tower_service::Service<http::Uri> + Send + 'static,
406 C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
407 C::Future: Send,
408 C::Error: std::error::Error + Send + Sync + 'static,
409 {
410 Self {
411 inner: ClientInner::new(ClientKind::Account, config, Some(connector)),
412 }
413 }
414
415 pub fn basin_client(&self, basin: types::BasinName) -> BasinClient {
417 BasinClient {
418 inner: self.inner.for_basin(basin),
419 }
420 }
421
422 #[sync_docs]
423 pub async fn list_basins(
424 &self,
425 req: types::ListBasinsRequest,
426 ) -> Result<types::ListBasinsResponse, ClientError> {
427 self.inner
428 .send_retryable(ListBasinsServiceRequest::new(
429 self.inner.account_service_client(),
430 req,
431 ))
432 .await
433 }
434
435 #[sync_docs]
436 pub async fn create_basin(
437 &self,
438 req: types::CreateBasinRequest,
439 ) -> Result<types::BasinInfo, ClientError> {
440 self.inner
441 .send_retryable(CreateBasinServiceRequest::new(
442 self.inner.account_service_client(),
443 req,
444 ))
445 .await
446 }
447
448 #[sync_docs]
449 pub async fn delete_basin(&self, req: types::DeleteBasinRequest) -> Result<(), ClientError> {
450 self.inner
451 .send_retryable(DeleteBasinServiceRequest::new(
452 self.inner.account_service_client(),
453 req,
454 ))
455 .await
456 }
457
458 #[sync_docs]
459 pub async fn get_basin_config(
460 &self,
461 basin: types::BasinName,
462 ) -> Result<types::BasinConfig, ClientError> {
463 self.inner
464 .send_retryable(GetBasinConfigServiceRequest::new(
465 self.inner.account_service_client(),
466 basin,
467 ))
468 .await
469 }
470
471 #[sync_docs]
472 pub async fn reconfigure_basin(
473 &self,
474 req: types::ReconfigureBasinRequest,
475 ) -> Result<types::BasinConfig, ClientError> {
476 self.inner
477 .send_retryable(ReconfigureBasinServiceRequest::new(
478 self.inner.account_service_client(),
479 req,
480 ))
481 .await
482 }
483
484 #[sync_docs]
485 pub async fn issue_access_token(
486 &self,
487 info: types::AccessTokenInfo,
488 ) -> Result<String, ClientError> {
489 self.inner
490 .send_retryable(IssueAccessTokenServiceRequest::new(
491 self.inner.account_service_client(),
492 info,
493 ))
494 .await
495 }
496
497 #[sync_docs]
498 pub async fn revoke_access_token(
499 &self,
500 id: types::AccessTokenId,
501 ) -> Result<types::AccessTokenInfo, ClientError> {
502 self.inner
503 .send_retryable(RevokeAccessTokenServiceRequest::new(
504 self.inner.account_service_client(),
505 id,
506 ))
507 .await
508 }
509
510 #[sync_docs]
511 pub async fn list_access_tokens(
512 &self,
513 req: types::ListAccessTokensRequest,
514 ) -> Result<types::ListAccessTokensResponse, ClientError> {
515 self.inner
516 .send_retryable(ListAccessTokensServiceRequest::new(
517 self.inner.account_service_client(),
518 req,
519 ))
520 .await
521 }
522}
523
524#[derive(Debug, Clone)]
526pub struct BasinClient {
527 inner: ClientInner,
528}
529
530impl BasinClient {
531 pub fn new(config: ClientConfig, basin: types::BasinName) -> Self {
533 Self {
534 inner: ClientInner::new(ClientKind::Basin(basin), config, DEFAULT_CONNECTOR),
535 }
536 }
537
538 #[cfg(feature = "connector")]
540 pub fn new_with_connector<C>(
541 config: ClientConfig,
542 basin: types::BasinName,
543 connector: C,
544 ) -> Self
545 where
546 C: tower_service::Service<http::Uri> + Send + 'static,
547 C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
548 C::Future: Send,
549 C::Error: std::error::Error + Send + Sync + 'static,
550 {
551 Self {
552 inner: ClientInner::new(ClientKind::Basin(basin), config, Some(connector)),
553 }
554 }
555
556 pub fn stream_client(&self, stream: impl Into<String>) -> StreamClient {
558 StreamClient {
559 inner: self.inner.clone(),
560 stream: stream.into(),
561 }
562 }
563
564 #[sync_docs]
565 pub async fn create_stream(
566 &self,
567 req: types::CreateStreamRequest,
568 ) -> Result<types::StreamInfo, ClientError> {
569 self.inner
570 .send_retryable(CreateStreamServiceRequest::new(
571 self.inner.basin_service_client(),
572 req,
573 ))
574 .await
575 }
576
577 #[sync_docs]
578 pub async fn list_streams(
579 &self,
580 req: types::ListStreamsRequest,
581 ) -> Result<types::ListStreamsResponse, ClientError> {
582 self.inner
583 .send_retryable(ListStreamsServiceRequest::new(
584 self.inner.basin_service_client(),
585 req,
586 ))
587 .await
588 }
589
590 #[sync_docs]
591 pub async fn get_stream_config(
592 &self,
593 stream: impl Into<String>,
594 ) -> Result<types::StreamConfig, ClientError> {
595 self.inner
596 .send_retryable(GetStreamConfigServiceRequest::new(
597 self.inner.basin_service_client(),
598 stream,
599 ))
600 .await
601 }
602
603 #[sync_docs]
604 pub async fn reconfigure_stream(
605 &self,
606 req: types::ReconfigureStreamRequest,
607 ) -> Result<types::StreamConfig, ClientError> {
608 self.inner
609 .send(ReconfigureStreamServiceRequest::new(
610 self.inner.basin_service_client(),
611 req,
612 ))
613 .await
614 }
615
616 #[sync_docs]
617 pub async fn delete_stream(&self, req: types::DeleteStreamRequest) -> Result<(), ClientError> {
618 self.inner
619 .send_retryable(DeleteStreamServiceRequest::new(
620 self.inner.basin_service_client(),
621 req,
622 ))
623 .await
624 }
625}
626
627#[derive(Debug, Clone)]
629pub struct StreamClient {
630 pub(crate) inner: ClientInner,
631 pub(crate) stream: String,
632}
633
634impl StreamClient {
635 pub fn new(config: ClientConfig, basin: types::BasinName, stream: impl Into<String>) -> Self {
637 BasinClient::new(config, basin).stream_client(stream)
638 }
639
640 #[cfg(feature = "connector")]
642 pub fn new_with_connector<C>(
643 config: ClientConfig,
644 basin: types::BasinName,
645 stream: impl Into<String>,
646 connector: C,
647 ) -> Self
648 where
649 C: tower_service::Service<http::Uri> + Send + 'static,
650 C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
651 C::Future: Send,
652 C::Error: std::error::Error + Send + Sync + 'static,
653 {
654 BasinClient::new_with_connector(config, basin, connector).stream_client(stream)
655 }
656
657 #[sync_docs]
658 pub async fn check_tail(&self) -> Result<StreamPosition, ClientError> {
659 self.inner
660 .send_retryable(CheckTailServiceRequest::new(
661 self.inner.stream_service_client(),
662 &self.stream,
663 ))
664 .await
665 }
666
667 #[sync_docs]
668 pub async fn read(&self, req: types::ReadRequest) -> Result<types::ReadOutput, ClientError> {
669 self.inner
670 .send_retryable(ReadServiceRequest::new(
671 self.inner.stream_service_client(),
672 &self.stream,
673 req,
674 self.inner.config.compression,
675 ))
676 .await
677 }
678
679 #[sync_docs]
680 pub async fn read_session(
681 &self,
682 req: types::ReadSessionRequest,
683 ) -> Result<Streaming<types::ReadOutput>, ClientError> {
684 let request = ReadSessionServiceRequest::new(
685 self.inner.stream_service_client(),
686 &self.stream,
687 req,
688 self.inner.config.compression,
689 );
690 self.inner
691 .send_retryable(request.clone())
692 .await
693 .map(|responses| {
694 Box::pin(read_resumption_stream(
695 request,
696 responses,
697 self.inner.clone(),
698 )) as _
699 })
700 }
701
702 #[sync_docs]
703 pub async fn append(&self, req: types::AppendInput) -> Result<types::AppendAck, ClientError> {
704 let frame_signal = FrameSignal::new();
705 self.inner
706 .send_retryable(AppendServiceRequest::new(
707 self.inner
708 .frame_monitoring_stream_service_client(frame_signal.clone()),
709 self.inner.config.append_retry_policy.clone(),
710 frame_signal,
711 &self.stream,
712 req,
713 self.inner.config.compression,
714 ))
715 .await
716 }
717
718 #[sync_docs]
719 #[allow(clippy::unused_async)]
720 pub async fn append_session<S>(
721 &self,
722 req: S,
723 ) -> Result<Streaming<types::AppendAck>, ClientError>
724 where
725 S: 'static + Send + Unpin + futures::Stream<Item = types::AppendInput>,
726 {
727 let (response_tx, response_rx) = mpsc::channel(10);
728 _ = tokio::spawn(append_session::manage_session(
729 self.clone(),
730 req,
731 response_tx,
732 self.inner.config.compression,
733 ));
734
735 Ok(Box::pin(ReceiverStream::new(response_rx)))
736 }
737}
738
739#[derive(Debug, Clone)]
740enum ClientKind {
741 Account,
742 Basin(types::BasinName),
743}
744
745impl ClientKind {
746 fn to_authority(&self, endpoints: &S2Endpoints) -> Authority {
747 match self {
748 ClientKind::Account => endpoints.account.clone(),
749 ClientKind::Basin(basin) => match &endpoints.basin {
750 BasinEndpoint::ParentZone(zone) => format!("{basin}.{zone}")
751 .try_into()
752 .expect("valid authority as basin pre-validated"),
753 BasinEndpoint::Direct(endpoint) => endpoint.clone(),
754 },
755 }
756 }
757}
758
759#[derive(Debug, Clone)]
760pub(crate) struct ClientInner {
761 kind: ClientKind,
762 channel: Channel,
763 pub(crate) config: ClientConfig,
764}
765
766impl ClientInner {
767 fn new<C>(kind: ClientKind, config: ClientConfig, connector: Option<C>) -> Self
768 where
769 C: tower_service::Service<http::Uri> + Send + 'static,
770 C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
771 C::Future: Send,
772 C::Error: std::error::Error + Send + Sync + 'static,
773 {
774 let authority = kind.to_authority(&config.endpoints);
775
776 #[cfg(not(feature = "connector"))]
777 let scheme = "https";
778 #[cfg(feature = "connector")]
779 let scheme = config.uri_scheme.as_str();
780
781 let endpoint = format!("{scheme}://{authority}")
782 .parse::<Endpoint>()
783 .expect("previously validated endpoint scheme and authority")
784 .user_agent(config.user_agent.clone())
785 .expect("converting HeaderValue into HeaderValue")
786 .http2_adaptive_window(true)
787 .tls_config(
788 ClientTlsConfig::default()
789 .with_webpki_roots()
790 .assume_http2(true),
791 )
792 .expect("valid TLS config")
793 .connect_timeout(config.connection_timeout)
794 .timeout(config.request_timeout);
795
796 let channel = if let Some(connector) = connector {
797 assert!(
798 matches!(&config.endpoints.basin, BasinEndpoint::Direct(a) if a == &config.endpoints.account),
799 "Connector only supported when connecting directly to a cell for account as well as basins"
800 );
801 endpoint.connect_with_connector_lazy(connector)
802 } else {
803 endpoint.connect_lazy()
804 };
805
806 Self {
807 kind,
808 channel,
809 config,
810 }
811 }
812
813 fn for_basin(&self, basin: types::BasinName) -> ClientInner {
814 let current_authority = self.kind.to_authority(&self.config.endpoints);
815 let new_kind = ClientKind::Basin(basin);
816 let new_authority = new_kind.to_authority(&self.config.endpoints);
817 if current_authority == new_authority {
818 Self {
819 kind: new_kind,
820 ..self.clone()
821 }
822 } else {
823 Self::new(new_kind, self.config.clone(), DEFAULT_CONNECTOR)
824 }
825 }
826
827 pub(crate) async fn send<T: ServiceRequest>(
828 &self,
829 service_req: T,
830 ) -> Result<T::Response, ClientError> {
831 let basin_header = match (&self.kind, &self.config.endpoints.basin) {
832 (ClientKind::Basin(basin), BasinEndpoint::Direct(_)) => {
833 Some(AsciiMetadataValue::from_str(basin).expect("valid"))
834 }
835 _ => None,
836 };
837 send_request(service_req, &self.config.token, basin_header).await
838 }
839
840 async fn send_retryable_with_backoff<T: ServiceRequest + Clone>(
841 &self,
842 service_req: T,
843 backoff_builder: impl BackoffBuilder,
844 ) -> Result<T::Response, ClientError> {
845 let retry_fn = || async { self.send(service_req.clone()).await };
846
847 retry_fn
848 .retry(backoff_builder)
849 .when(|e| service_req.should_retry(e))
850 .adjust(|e, backoff_duration| match e {
851 ClientError::Service(s) => {
852 if let Some(value) = s.metadata().get(RETRY_AFTER_MS_METADATA_KEY) {
853 if let Some(retry_after_ms) = value
854 .to_str()
855 .ok()
856 .map(|v| v.parse())
857 .transpose()
858 .ok()
859 .flatten()
860 {
861 Some(Duration::from_millis(retry_after_ms))
862 } else {
863 warn!(
864 "Failed to convert {RETRY_AFTER_MS_METADATA_KEY} metadata to u64.
865 Falling back to default backoff duration: {:?}",
866 backoff_duration
867 );
868 backoff_duration
869 }
870 } else {
871 backoff_duration
872 }
873 }
874 _ => backoff_duration,
875 })
876 .await
877 }
878
879 pub(crate) async fn send_retryable<T: ServiceRequest + Clone>(
880 &self,
881 service_req: T,
882 ) -> Result<T::Response, ClientError> {
883 self.send_retryable_with_backoff(service_req, self.backoff_builder())
884 .await
885 }
886
887 pub(crate) fn backoff_builder(&self) -> impl BackoffBuilder + use<> {
888 ConstantBuilder::default()
889 .with_delay(self.config.retry_backoff_duration)
890 .with_max_times(self.config.max_attempts)
891 .with_jitter()
892 }
893
894 fn account_service_client(&self) -> AccountServiceClient<Channel> {
895 AccountServiceClient::new(self.channel.clone())
896 }
897
898 fn basin_service_client(&self) -> BasinServiceClient<Channel> {
899 BasinServiceClient::new(self.channel.clone())
900 }
901
902 pub(crate) fn stream_service_client(&self) -> StreamServiceClient<Channel> {
903 StreamServiceClient::new(self.channel.clone())
904 }
905
906 pub(crate) fn frame_monitoring_stream_service_client(
907 &self,
908 frame_signal: FrameSignal,
909 ) -> StreamServiceClient<RequestFrameMonitor> {
910 StreamServiceClient::new(RequestFrameMonitor::new(self.channel.clone(), frame_signal))
911 }
912}
913
914fn read_resumption_stream(
915 mut request: ReadSessionServiceRequest,
916 mut responses: ServiceStreamingResponse<ReadSessionStreamingResponse>,
917 client: ClientInner,
918) -> impl Send + futures::Stream<Item = Result<types::ReadOutput, ClientError>> {
919 let mut backoff = None;
920 async_stream::stream! {
921 while let Some(item) = responses.next().await {
922 match item {
923 Err(e) if request.should_retry(&e) => {
924 if backoff.is_none() {
925 backoff = Some(client.backoff_builder().build());
926 }
927 if let Some(duration) = backoff.as_mut().and_then(|b| b.next()) {
928 sleep(duration).await;
929 if let Ok(new_responses) = client.send_retryable(request.clone()).await {
930 responses = new_responses;
931 } else {
932 yield Err(e);
933 }
934 } else {
935 yield Err(e);
936 }
937 }
938 item => {
939 if item.is_ok() {
940 backoff = None;
941 }
942 if let Ok(types::ReadOutput::Batch(types::SequencedRecordBatch { records })) = &item {
943 let req = request.req_mut();
944 if let Some(record) = records.last() {
945 req.start = ReadStart::SeqNum(record.seq_num + 1);
946 }
947 if let Some(count) = req.limit.count.as_mut() {
948 *count = count.saturating_sub(records.len() as u64);
949 }
950 if let Some(bytes) = req.limit.bytes.as_mut() {
951 *bytes = bytes.saturating_sub(records.metered_bytes());
952 }
953 }
954 yield item;
955 }
956 }
957 }
958 }
959}