1use std::{env::VarError, fmt::Display, str::FromStr, time::Duration};
35
36use backon::{BackoffBuilder, ConstantBuilder, Retryable};
37use futures::StreamExt;
38use http::{HeaderValue, uri::Authority};
39use hyper_util::client::legacy::connect::HttpConnector;
40use secrecy::SecretString;
41use sync_docs::sync_docs;
42use tokio::{sync::mpsc, time::sleep};
43use tokio_stream::wrappers::ReceiverStream;
44use tonic::{
45 metadata::AsciiMetadataValue,
46 transport::{Channel, ClientTlsConfig, Endpoint},
47};
48use tonic_side_effect::{FrameSignal, RequestFrameMonitor};
49
50use crate::{
51 api::{
52 account_service_client::AccountServiceClient, basin_service_client::BasinServiceClient,
53 stream_service_client::StreamServiceClient,
54 },
55 append_session,
56 service::{
57 ServiceRequest, ServiceStreamingResponse, Streaming,
58 account::{
59 CreateBasinServiceRequest, DeleteBasinServiceRequest, GetBasinConfigServiceRequest,
60 IssueAccessTokenServiceRequest, ListAccessTokensServiceRequest,
61 ListBasinsServiceRequest, ReconfigureBasinServiceRequest,
62 RevokeAccessTokenServiceRequest,
63 },
64 basin::{
65 CreateStreamServiceRequest, DeleteStreamServiceRequest, GetStreamConfigServiceRequest,
66 ListStreamsServiceRequest, ReconfigureStreamServiceRequest,
67 },
68 send_request,
69 stream::{
70 AppendServiceRequest, CheckTailServiceRequest, ReadServiceRequest,
71 ReadSessionServiceRequest, ReadSessionStreamingResponse,
72 },
73 },
74 types::{self, MIB_BYTES, MeteredBytes, ReadStart, StreamPosition},
75};
76
77const DEFAULT_CONNECTOR: Option<HttpConnector> = None;
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum S2Cloud {
82 Aws,
84}
85
86impl S2Cloud {
87 const AWS: &'static str = "aws";
88
89 fn as_str(&self) -> &'static str {
90 match self {
91 Self::Aws => Self::AWS,
92 }
93 }
94}
95
96impl Display for S2Cloud {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 f.write_str(self.as_str())
99 }
100}
101
102impl FromStr for S2Cloud {
103 type Err = String;
104
105 fn from_str(s: &str) -> Result<Self, Self::Err> {
106 if s.eq_ignore_ascii_case(Self::AWS) {
107 Ok(Self::Aws)
108 } else {
109 Err(s.to_owned())
110 }
111 }
112}
113
114#[derive(Debug, Clone)]
116pub enum BasinEndpoint {
117 ParentZone(Authority),
120 Direct(Authority),
124}
125
126#[derive(Debug, Clone)]
132pub struct S2Endpoints {
133 pub account: Authority,
135 pub basin: BasinEndpoint,
137}
138
139#[derive(Debug, Clone)]
141pub enum AppendRetryPolicy {
142 All,
146
147 NoSideEffects,
152}
153
154impl S2Endpoints {
155 pub fn for_cloud(cloud: S2Cloud) -> Self {
157 Self {
158 account: format!("{cloud}.s2.dev")
159 .try_into()
160 .expect("valid authority"),
161 basin: BasinEndpoint::ParentZone(
162 format!("b.{cloud}.s2.dev")
163 .try_into()
164 .expect("valid authority"),
165 ),
166 }
167 }
168
169 pub fn for_cell(
171 cloud: S2Cloud,
172 cell_id: impl Into<String>,
173 ) -> Result<Self, http::uri::InvalidUri> {
174 let cell_endpoint: Authority = format!("{}.o.{cloud}.s2.dev", cell_id.into()).try_into()?;
175 Ok(Self {
176 account: cell_endpoint.clone(),
177 basin: BasinEndpoint::Direct(cell_endpoint),
178 })
179 }
180
181 pub fn from_env() -> Result<Self, String> {
189 let cloud: S2Cloud = std::env::var("S2_CLOUD")
190 .ok()
191 .as_deref()
192 .unwrap_or(S2Cloud::AWS)
193 .parse()
194 .map_err(|cloud| format!("Invalid S2_CLOUD: {cloud}"))?;
195
196 let mut endpoints = Self::for_cloud(cloud);
197
198 match std::env::var("S2_ACCOUNT_ENDPOINT") {
199 Ok(spec) => {
200 endpoints.account = spec
201 .as_str()
202 .try_into()
203 .map_err(|_| format!("Invalid S2_ACCOUNT_ENDPOINT: {spec}"))?;
204 }
205 Err(VarError::NotPresent) => {}
206 Err(VarError::NotUnicode(_)) => {
207 return Err("Invalid S2_ACCOUNT_ENDPOINT: not Unicode".to_owned());
208 }
209 }
210
211 match std::env::var("S2_BASIN_ENDPOINT") {
212 Ok(spec) => {
213 endpoints.basin = if let Some(parent_zone) = spec.strip_prefix("{basin}.") {
214 BasinEndpoint::ParentZone(
215 parent_zone
216 .try_into()
217 .map_err(|e| format!("Invalid S2_BASIN_ENDPOINT ({e}): {spec}"))?,
218 )
219 } else {
220 BasinEndpoint::Direct(
221 spec.as_str()
222 .try_into()
223 .map_err(|e| format!("Invalid S2_BASIN_ENDPOINT ({e}): {spec}"))?,
224 )
225 }
226 }
227 Err(VarError::NotPresent) => {}
228 Err(VarError::NotUnicode(_)) => {
229 return Err("Invalid S2_BASIN_ENDPOINT: not Unicode".to_owned());
230 }
231 }
232
233 Ok(endpoints)
234 }
235}
236
237#[derive(Debug, Clone)]
239pub struct ClientConfig {
240 pub(crate) token: SecretString,
241 pub(crate) endpoints: S2Endpoints,
242 pub(crate) connection_timeout: Duration,
243 pub(crate) request_timeout: Duration,
244 pub(crate) user_agent: HeaderValue,
245 pub(crate) append_retry_policy: AppendRetryPolicy,
246 #[cfg(feature = "connector")]
247 pub(crate) uri_scheme: http::uri::Scheme,
248 pub(crate) retry_backoff_duration: Duration,
249 pub(crate) max_attempts: usize,
250 pub(crate) max_append_inflight_bytes: u64,
251 pub(crate) compression: bool,
252}
253
254impl ClientConfig {
255 pub fn new(token: impl Into<String>) -> Self {
257 Self {
258 token: token.into().into(),
259 endpoints: S2Endpoints::for_cloud(S2Cloud::Aws),
260 connection_timeout: Duration::from_secs(3),
261 request_timeout: Duration::from_secs(5),
262 user_agent: "s2-sdk-rust".parse().expect("valid user agent"),
263 append_retry_policy: AppendRetryPolicy::All,
264 #[cfg(feature = "connector")]
265 uri_scheme: http::uri::Scheme::HTTPS,
266 retry_backoff_duration: Duration::from_millis(100),
267 max_attempts: 3,
268 max_append_inflight_bytes: 100 * MIB_BYTES,
269 compression: false,
270 }
271 }
272
273 pub fn with_endpoints(self, host_endpoints: impl Into<S2Endpoints>) -> Self {
275 Self {
276 endpoints: host_endpoints.into(),
277 ..self
278 }
279 }
280
281 pub fn with_connection_timeout(self, connection_timeout: impl Into<Duration>) -> Self {
283 Self {
284 connection_timeout: connection_timeout.into(),
285 ..self
286 }
287 }
288
289 pub fn with_request_timeout(self, request_timeout: impl Into<Duration>) -> Self {
291 Self {
292 request_timeout: request_timeout.into(),
293 ..self
294 }
295 }
296
297 pub fn with_user_agent(self, user_agent: HeaderValue) -> Self {
299 Self { user_agent, ..self }
300 }
301
302 pub fn with_append_retry_policy(
307 self,
308 append_retry_policy: impl Into<AppendRetryPolicy>,
309 ) -> Self {
310 Self {
311 append_retry_policy: append_retry_policy.into(),
312 ..self
313 }
314 }
315
316 pub fn with_max_append_inflight_bytes(self, max_append_inflight_bytes: u64) -> Self {
321 assert!(
322 max_append_inflight_bytes >= MIB_BYTES,
323 "max_append_inflight_bytes must be at least 1MiB"
324 );
325 Self {
326 max_append_inflight_bytes,
327 ..self
328 }
329 }
330
331 #[cfg(feature = "connector")]
333 pub fn with_uri_scheme(self, uri_scheme: impl Into<http::uri::Scheme>) -> Self {
334 Self {
335 uri_scheme: uri_scheme.into(),
336 ..self
337 }
338 }
339
340 pub fn with_retry_backoff_duration(self, retry_backoff_duration: impl Into<Duration>) -> Self {
344 Self {
345 retry_backoff_duration: retry_backoff_duration.into(),
346 ..self
347 }
348 }
349
350 pub fn with_max_attempts(self, max_attempts: usize) -> Self {
354 assert!(max_attempts > 0, "max attempts must be greater than 0");
355 Self {
356 max_attempts,
357 ..self
358 }
359 }
360
361 pub fn with_compression(self, compression: bool) -> Self {
364 Self {
365 compression,
366 ..self
367 }
368 }
369}
370
371#[derive(Debug, Clone, thiserror::Error)]
373pub enum ClientError {
374 #[error(transparent)]
378 Conversion(#[from] types::ConvertError),
379 #[error(transparent)]
381 Service(#[from] tonic::Status),
382}
383
384#[derive(Debug, Clone)]
386pub struct Client {
387 inner: ClientInner,
388}
389
390impl Client {
391 pub fn new(config: ClientConfig) -> Self {
393 Self {
394 inner: ClientInner::new(ClientKind::Account, config, DEFAULT_CONNECTOR),
395 }
396 }
397
398 #[cfg(feature = "connector")]
400 pub fn new_with_connector<C>(config: ClientConfig, connector: C) -> Self
401 where
402 C: tower_service::Service<http::Uri> + Send + 'static,
403 C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
404 C::Future: Send,
405 C::Error: std::error::Error + Send + Sync + 'static,
406 {
407 Self {
408 inner: ClientInner::new(ClientKind::Account, config, Some(connector)),
409 }
410 }
411
412 pub fn basin_client(&self, basin: types::BasinName) -> BasinClient {
414 BasinClient {
415 inner: self.inner.for_basin(basin),
416 }
417 }
418
419 #[sync_docs]
420 pub async fn list_basins(
421 &self,
422 req: types::ListBasinsRequest,
423 ) -> Result<types::ListBasinsResponse, ClientError> {
424 self.inner
425 .send_retryable(ListBasinsServiceRequest::new(
426 self.inner.account_service_client(),
427 req,
428 ))
429 .await
430 }
431
432 #[sync_docs]
433 pub async fn create_basin(
434 &self,
435 req: types::CreateBasinRequest,
436 ) -> Result<types::BasinInfo, ClientError> {
437 self.inner
438 .send_retryable(CreateBasinServiceRequest::new(
439 self.inner.account_service_client(),
440 req,
441 ))
442 .await
443 }
444
445 #[sync_docs]
446 pub async fn delete_basin(&self, req: types::DeleteBasinRequest) -> Result<(), ClientError> {
447 self.inner
448 .send_retryable(DeleteBasinServiceRequest::new(
449 self.inner.account_service_client(),
450 req,
451 ))
452 .await
453 }
454
455 #[sync_docs]
456 pub async fn get_basin_config(
457 &self,
458 basin: types::BasinName,
459 ) -> Result<types::BasinConfig, ClientError> {
460 self.inner
461 .send_retryable(GetBasinConfigServiceRequest::new(
462 self.inner.account_service_client(),
463 basin,
464 ))
465 .await
466 }
467
468 #[sync_docs]
469 pub async fn reconfigure_basin(
470 &self,
471 req: types::ReconfigureBasinRequest,
472 ) -> Result<types::BasinConfig, ClientError> {
473 self.inner
474 .send_retryable(ReconfigureBasinServiceRequest::new(
475 self.inner.account_service_client(),
476 req,
477 ))
478 .await
479 }
480
481 #[sync_docs]
482 pub async fn issue_access_token(
483 &self,
484 info: types::AccessTokenInfo,
485 ) -> Result<String, ClientError> {
486 self.inner
487 .send_retryable(IssueAccessTokenServiceRequest::new(
488 self.inner.account_service_client(),
489 info,
490 ))
491 .await
492 }
493
494 #[sync_docs]
495 pub async fn revoke_access_token(
496 &self,
497 id: types::AccessTokenId,
498 ) -> Result<types::AccessTokenInfo, ClientError> {
499 self.inner
500 .send_retryable(RevokeAccessTokenServiceRequest::new(
501 self.inner.account_service_client(),
502 id,
503 ))
504 .await
505 }
506
507 #[sync_docs]
508 pub async fn list_access_tokens(
509 &self,
510 req: types::ListAccessTokensRequest,
511 ) -> Result<types::ListAccessTokensResponse, ClientError> {
512 self.inner
513 .send_retryable(ListAccessTokensServiceRequest::new(
514 self.inner.account_service_client(),
515 req,
516 ))
517 .await
518 }
519}
520
521#[derive(Debug, Clone)]
523pub struct BasinClient {
524 inner: ClientInner,
525}
526
527impl BasinClient {
528 pub fn new(config: ClientConfig, basin: types::BasinName) -> Self {
530 Self {
531 inner: ClientInner::new(ClientKind::Basin(basin), config, DEFAULT_CONNECTOR),
532 }
533 }
534
535 #[cfg(feature = "connector")]
537 pub fn new_with_connector<C>(
538 config: ClientConfig,
539 basin: types::BasinName,
540 connector: C,
541 ) -> Self
542 where
543 C: tower_service::Service<http::Uri> + Send + 'static,
544 C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
545 C::Future: Send,
546 C::Error: std::error::Error + Send + Sync + 'static,
547 {
548 Self {
549 inner: ClientInner::new(ClientKind::Basin(basin), config, Some(connector)),
550 }
551 }
552
553 pub fn stream_client(&self, stream: impl Into<String>) -> StreamClient {
555 StreamClient {
556 inner: self.inner.clone(),
557 stream: stream.into(),
558 }
559 }
560
561 #[sync_docs]
562 pub async fn create_stream(
563 &self,
564 req: types::CreateStreamRequest,
565 ) -> Result<types::StreamInfo, ClientError> {
566 self.inner
567 .send_retryable(CreateStreamServiceRequest::new(
568 self.inner.basin_service_client(),
569 req,
570 ))
571 .await
572 }
573
574 #[sync_docs]
575 pub async fn list_streams(
576 &self,
577 req: types::ListStreamsRequest,
578 ) -> Result<types::ListStreamsResponse, ClientError> {
579 self.inner
580 .send_retryable(ListStreamsServiceRequest::new(
581 self.inner.basin_service_client(),
582 req,
583 ))
584 .await
585 }
586
587 #[sync_docs]
588 pub async fn get_stream_config(
589 &self,
590 stream: impl Into<String>,
591 ) -> Result<types::StreamConfig, ClientError> {
592 self.inner
593 .send_retryable(GetStreamConfigServiceRequest::new(
594 self.inner.basin_service_client(),
595 stream,
596 ))
597 .await
598 }
599
600 #[sync_docs]
601 pub async fn reconfigure_stream(
602 &self,
603 req: types::ReconfigureStreamRequest,
604 ) -> Result<types::StreamConfig, ClientError> {
605 self.inner
606 .send(ReconfigureStreamServiceRequest::new(
607 self.inner.basin_service_client(),
608 req,
609 ))
610 .await
611 }
612
613 #[sync_docs]
614 pub async fn delete_stream(&self, req: types::DeleteStreamRequest) -> Result<(), ClientError> {
615 self.inner
616 .send_retryable(DeleteStreamServiceRequest::new(
617 self.inner.basin_service_client(),
618 req,
619 ))
620 .await
621 }
622}
623
624#[derive(Debug, Clone)]
626pub struct StreamClient {
627 pub(crate) inner: ClientInner,
628 pub(crate) stream: String,
629}
630
631impl StreamClient {
632 pub fn new(config: ClientConfig, basin: types::BasinName, stream: impl Into<String>) -> Self {
634 BasinClient::new(config, basin).stream_client(stream)
635 }
636
637 #[cfg(feature = "connector")]
639 pub fn new_with_connector<C>(
640 config: ClientConfig,
641 basin: types::BasinName,
642 stream: impl Into<String>,
643 connector: C,
644 ) -> Self
645 where
646 C: tower_service::Service<http::Uri> + Send + 'static,
647 C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
648 C::Future: Send,
649 C::Error: std::error::Error + Send + Sync + 'static,
650 {
651 BasinClient::new_with_connector(config, basin, connector).stream_client(stream)
652 }
653
654 #[sync_docs]
655 pub async fn check_tail(&self) -> Result<StreamPosition, ClientError> {
656 self.inner
657 .send_retryable(CheckTailServiceRequest::new(
658 self.inner.stream_service_client(),
659 &self.stream,
660 ))
661 .await
662 }
663
664 #[sync_docs]
665 pub async fn read(&self, req: types::ReadRequest) -> Result<types::ReadOutput, ClientError> {
666 self.inner
667 .send_retryable(ReadServiceRequest::new(
668 self.inner.stream_service_client(),
669 &self.stream,
670 req,
671 self.inner.config.compression,
672 ))
673 .await
674 }
675
676 #[sync_docs]
677 pub async fn read_session(
678 &self,
679 req: types::ReadSessionRequest,
680 ) -> Result<Streaming<types::ReadOutput>, ClientError> {
681 let request = ReadSessionServiceRequest::new(
682 self.inner.stream_service_client(),
683 &self.stream,
684 req,
685 self.inner.config.compression,
686 );
687 self.inner
688 .send_retryable(request.clone())
689 .await
690 .map(|responses| {
691 Box::pin(read_resumption_stream(
692 request,
693 responses,
694 self.inner.clone(),
695 )) as _
696 })
697 }
698
699 #[sync_docs]
700 pub async fn append(&self, req: types::AppendInput) -> Result<types::AppendAck, ClientError> {
701 let frame_signal = FrameSignal::new();
702 self.inner
703 .send_retryable(AppendServiceRequest::new(
704 self.inner
705 .frame_monitoring_stream_service_client(frame_signal.clone()),
706 self.inner.config.append_retry_policy.clone(),
707 frame_signal,
708 &self.stream,
709 req,
710 self.inner.config.compression,
711 ))
712 .await
713 }
714
715 #[sync_docs]
716 #[allow(clippy::unused_async)]
717 pub async fn append_session<S>(
718 &self,
719 req: S,
720 ) -> Result<Streaming<types::AppendAck>, ClientError>
721 where
722 S: 'static + Send + Unpin + futures::Stream<Item = types::AppendInput>,
723 {
724 let (response_tx, response_rx) = mpsc::channel(10);
725 _ = tokio::spawn(append_session::manage_session(
726 self.clone(),
727 req,
728 response_tx,
729 self.inner.config.compression,
730 ));
731
732 Ok(Box::pin(ReceiverStream::new(response_rx)))
733 }
734}
735
736#[derive(Debug, Clone)]
737enum ClientKind {
738 Account,
739 Basin(types::BasinName),
740}
741
742impl ClientKind {
743 fn to_authority(&self, endpoints: &S2Endpoints) -> Authority {
744 match self {
745 ClientKind::Account => endpoints.account.clone(),
746 ClientKind::Basin(basin) => match &endpoints.basin {
747 BasinEndpoint::ParentZone(zone) => format!("{basin}.{zone}")
748 .try_into()
749 .expect("valid authority as basin pre-validated"),
750 BasinEndpoint::Direct(endpoint) => endpoint.clone(),
751 },
752 }
753 }
754}
755
756#[derive(Debug, Clone)]
757pub(crate) struct ClientInner {
758 kind: ClientKind,
759 channel: Channel,
760 pub(crate) config: ClientConfig,
761}
762
763impl ClientInner {
764 fn new<C>(kind: ClientKind, config: ClientConfig, connector: Option<C>) -> Self
765 where
766 C: tower_service::Service<http::Uri> + Send + 'static,
767 C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
768 C::Future: Send,
769 C::Error: std::error::Error + Send + Sync + 'static,
770 {
771 let authority = kind.to_authority(&config.endpoints);
772
773 #[cfg(not(feature = "connector"))]
774 let scheme = "https";
775 #[cfg(feature = "connector")]
776 let scheme = config.uri_scheme.as_str();
777
778 let endpoint = format!("{scheme}://{authority}")
779 .parse::<Endpoint>()
780 .expect("previously validated endpoint scheme and authority")
781 .user_agent(config.user_agent.clone())
782 .expect("converting HeaderValue into HeaderValue")
783 .http2_adaptive_window(true)
784 .tls_config(
785 ClientTlsConfig::default()
786 .with_webpki_roots()
787 .assume_http2(true),
788 )
789 .expect("valid TLS config")
790 .connect_timeout(config.connection_timeout)
791 .timeout(config.request_timeout);
792
793 let channel = if let Some(connector) = connector {
794 assert!(
795 matches!(&config.endpoints.basin, BasinEndpoint::Direct(a) if a == &config.endpoints.account),
796 "Connector only supported when connecting directly to a cell for account as well as basins"
797 );
798 endpoint.connect_with_connector_lazy(connector)
799 } else {
800 endpoint.connect_lazy()
801 };
802
803 Self {
804 kind,
805 channel,
806 config,
807 }
808 }
809
810 fn for_basin(&self, basin: types::BasinName) -> ClientInner {
811 let current_authority = self.kind.to_authority(&self.config.endpoints);
812 let new_kind = ClientKind::Basin(basin);
813 let new_authority = new_kind.to_authority(&self.config.endpoints);
814 if current_authority == new_authority {
815 Self {
816 kind: new_kind,
817 ..self.clone()
818 }
819 } else {
820 Self::new(new_kind, self.config.clone(), DEFAULT_CONNECTOR)
821 }
822 }
823
824 pub(crate) async fn send<T: ServiceRequest>(
825 &self,
826 service_req: T,
827 ) -> Result<T::Response, ClientError> {
828 let basin_header = match (&self.kind, &self.config.endpoints.basin) {
829 (ClientKind::Basin(basin), BasinEndpoint::Direct(_)) => {
830 Some(AsciiMetadataValue::from_str(basin).expect("valid"))
831 }
832 _ => None,
833 };
834 send_request(service_req, &self.config.token, basin_header).await
835 }
836
837 async fn send_retryable_with_backoff<T: ServiceRequest + Clone>(
838 &self,
839 service_req: T,
840 backoff_builder: impl BackoffBuilder,
841 ) -> Result<T::Response, ClientError> {
842 let retry_fn = || async { self.send(service_req.clone()).await };
843
844 retry_fn
845 .retry(backoff_builder)
846 .when(|e| service_req.should_retry(e))
847 .await
848 }
849
850 pub(crate) async fn send_retryable<T: ServiceRequest + Clone>(
851 &self,
852 service_req: T,
853 ) -> Result<T::Response, ClientError> {
854 self.send_retryable_with_backoff(service_req, self.backoff_builder())
855 .await
856 }
857
858 pub(crate) fn backoff_builder(&self) -> impl BackoffBuilder + use<> {
859 ConstantBuilder::default()
860 .with_delay(self.config.retry_backoff_duration)
861 .with_max_times(self.config.max_attempts)
862 .with_jitter()
863 }
864
865 fn account_service_client(&self) -> AccountServiceClient<Channel> {
866 AccountServiceClient::new(self.channel.clone())
867 }
868
869 fn basin_service_client(&self) -> BasinServiceClient<Channel> {
870 BasinServiceClient::new(self.channel.clone())
871 }
872
873 pub(crate) fn stream_service_client(&self) -> StreamServiceClient<Channel> {
874 StreamServiceClient::new(self.channel.clone())
875 }
876
877 pub(crate) fn frame_monitoring_stream_service_client(
878 &self,
879 frame_signal: FrameSignal,
880 ) -> StreamServiceClient<RequestFrameMonitor> {
881 StreamServiceClient::new(RequestFrameMonitor::new(self.channel.clone(), frame_signal))
882 }
883}
884
885fn read_resumption_stream(
886 mut request: ReadSessionServiceRequest,
887 mut responses: ServiceStreamingResponse<ReadSessionStreamingResponse>,
888 client: ClientInner,
889) -> impl Send + futures::Stream<Item = Result<types::ReadOutput, ClientError>> {
890 let mut backoff = None;
891 async_stream::stream! {
892 while let Some(item) = responses.next().await {
893 match item {
894 Err(e) if request.should_retry(&e) => {
895 if backoff.is_none() {
896 backoff = Some(client.backoff_builder().build());
897 }
898 if let Some(duration) = backoff.as_mut().and_then(|b| b.next()) {
899 sleep(duration).await;
900 if let Ok(new_responses) = client.send_retryable(request.clone()).await {
901 responses = new_responses;
902 } else {
903 yield Err(e);
904 }
905 } else {
906 yield Err(e);
907 }
908 }
909 item => {
910 if item.is_ok() {
911 backoff = None;
912 }
913 if let Ok(types::ReadOutput::Batch(types::SequencedRecordBatch { records })) = &item {
914 let req = request.req_mut();
915 if let Some(record) = records.last() {
916 req.start = ReadStart::SeqNum(record.seq_num + 1);
917 }
918 if let Some(count) = req.limit.count.as_mut() {
919 *count = count.saturating_sub(records.len() as u64);
920 }
921 if let Some(bytes) = req.limit.bytes.as_mut() {
922 *bytes = bytes.saturating_sub(records.metered_bytes());
923 }
924 }
925 yield item;
926 }
927 }
928 }
929 }
930}