1use std::{env::VarError, fmt::Display, str::FromStr, time::Duration};
35
36use backon::{BackoffBuilder, ConstantBuilder, Retryable};
37use futures::StreamExt;
38use http::{HeaderValue, uri::Authority};
39use hyper_util::client::legacy::connect::HttpConnector;
40use secrecy::SecretString;
41use sync_docs::sync_docs;
42use tokio::{sync::mpsc, time::sleep};
43use tokio_stream::wrappers::ReceiverStream;
44use tonic::{
45 metadata::AsciiMetadataValue,
46 transport::{Channel, ClientTlsConfig, Endpoint},
47};
48use tonic_side_effect::{FrameSignal, RequestFrameMonitor};
49
50use crate::{
51 api::{
52 account_service_client::AccountServiceClient, basin_service_client::BasinServiceClient,
53 stream_service_client::StreamServiceClient,
54 },
55 append_session,
56 service::{
57 ServiceRequest, ServiceStreamingResponse, Streaming,
58 account::{
59 CreateBasinServiceRequest, DeleteBasinServiceRequest, GetBasinConfigServiceRequest,
60 IssueAccessTokenServiceRequest, ListAccessTokensServiceRequest,
61 ListBasinsServiceRequest, ReconfigureBasinServiceRequest,
62 RevokeAccessTokenServiceRequest,
63 },
64 basin::{
65 CreateStreamServiceRequest, DeleteStreamServiceRequest, GetStreamConfigServiceRequest,
66 ListStreamsServiceRequest, ReconfigureStreamServiceRequest,
67 },
68 send_request,
69 stream::{
70 AppendServiceRequest, CheckTailServiceRequest, ReadServiceRequest,
71 ReadSessionServiceRequest, ReadSessionStreamingResponse,
72 },
73 },
74 types::{self, MIB_BYTES, MeteredBytes},
75};
76
77const DEFAULT_CONNECTOR: Option<HttpConnector> = None;
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum S2Cloud {
82 Aws,
84}
85
86impl S2Cloud {
87 const AWS: &'static str = "aws";
88
89 fn as_str(&self) -> &'static str {
90 match self {
91 Self::Aws => Self::AWS,
92 }
93 }
94}
95
96impl Display for S2Cloud {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 f.write_str(self.as_str())
99 }
100}
101
102impl FromStr for S2Cloud {
103 type Err = String;
104
105 fn from_str(s: &str) -> Result<Self, Self::Err> {
106 if s.eq_ignore_ascii_case(Self::AWS) {
107 Ok(Self::Aws)
108 } else {
109 Err(s.to_owned())
110 }
111 }
112}
113
114#[derive(Debug, Clone)]
116pub enum BasinEndpoint {
117 ParentZone(Authority),
120 Direct(Authority),
124}
125
126#[derive(Debug, Clone)]
132pub struct S2Endpoints {
133 pub account: Authority,
135 pub basin: BasinEndpoint,
137}
138
139#[derive(Debug, Clone)]
141pub enum AppendRetryPolicy {
142 All,
146
147 NoSideEffects,
152}
153
154impl S2Endpoints {
155 pub fn for_cloud(cloud: S2Cloud) -> Self {
157 Self {
158 account: format!("{cloud}.s2.dev")
159 .try_into()
160 .expect("valid authority"),
161 basin: BasinEndpoint::ParentZone(
162 format!("b.{cloud}.s2.dev")
163 .try_into()
164 .expect("valid authority"),
165 ),
166 }
167 }
168
169 pub fn for_cell(
171 cloud: S2Cloud,
172 cell_id: impl Into<String>,
173 ) -> Result<Self, http::uri::InvalidUri> {
174 let cell_endpoint: Authority = format!("{}.o.{cloud}.s2.dev", cell_id.into()).try_into()?;
175 Ok(Self {
176 account: cell_endpoint.clone(),
177 basin: BasinEndpoint::Direct(cell_endpoint),
178 })
179 }
180
181 pub fn from_env() -> Result<Self, String> {
189 let cloud: S2Cloud = std::env::var("S2_CLOUD")
190 .ok()
191 .as_deref()
192 .unwrap_or(S2Cloud::AWS)
193 .parse()
194 .map_err(|cloud| format!("Invalid S2_CLOUD: {cloud}"))?;
195
196 let mut endpoints = Self::for_cloud(cloud);
197
198 match std::env::var("S2_ACCOUNT_ENDPOINT") {
199 Ok(spec) => {
200 endpoints.account = spec
201 .as_str()
202 .try_into()
203 .map_err(|_| format!("Invalid S2_ACCOUNT_ENDPOINT: {spec}"))?;
204 }
205 Err(VarError::NotPresent) => {}
206 Err(VarError::NotUnicode(_)) => {
207 return Err("Invalid S2_ACCOUNT_ENDPOINT: not Unicode".to_owned());
208 }
209 }
210
211 match std::env::var("S2_BASIN_ENDPOINT") {
212 Ok(spec) => {
213 endpoints.basin = if let Some(parent_zone) = spec.strip_prefix("{basin}.") {
214 BasinEndpoint::ParentZone(
215 parent_zone
216 .try_into()
217 .map_err(|e| format!("Invalid S2_BASIN_ENDPOINT ({e}): {spec}"))?,
218 )
219 } else {
220 BasinEndpoint::Direct(
221 spec.as_str()
222 .try_into()
223 .map_err(|e| format!("Invalid S2_BASIN_ENDPOINT ({e}): {spec}"))?,
224 )
225 }
226 }
227 Err(VarError::NotPresent) => {}
228 Err(VarError::NotUnicode(_)) => {
229 return Err("Invalid S2_BASIN_ENDPOINT: not Unicode".to_owned());
230 }
231 }
232
233 Ok(endpoints)
234 }
235}
236
237#[derive(Debug, Clone)]
239pub struct ClientConfig {
240 pub(crate) token: SecretString,
241 pub(crate) endpoints: S2Endpoints,
242 pub(crate) connection_timeout: Duration,
243 pub(crate) request_timeout: Duration,
244 pub(crate) user_agent: HeaderValue,
245 pub(crate) append_retry_policy: AppendRetryPolicy,
246 #[cfg(feature = "connector")]
247 pub(crate) uri_scheme: http::uri::Scheme,
248 pub(crate) retry_backoff_duration: Duration,
249 pub(crate) max_attempts: usize,
250 pub(crate) max_append_inflight_bytes: u64,
251 pub(crate) compression: bool,
252}
253
254impl ClientConfig {
255 pub fn new(token: impl Into<String>) -> Self {
257 Self {
258 token: token.into().into(),
259 endpoints: S2Endpoints::for_cloud(S2Cloud::Aws),
260 connection_timeout: Duration::from_secs(3),
261 request_timeout: Duration::from_secs(5),
262 user_agent: "s2-sdk-rust".parse().expect("valid user agent"),
263 append_retry_policy: AppendRetryPolicy::All,
264 #[cfg(feature = "connector")]
265 uri_scheme: http::uri::Scheme::HTTPS,
266 retry_backoff_duration: Duration::from_millis(100),
267 max_attempts: 3,
268 max_append_inflight_bytes: 100 * MIB_BYTES,
269 compression: false,
270 }
271 }
272
273 pub fn with_endpoints(self, host_endpoints: impl Into<S2Endpoints>) -> Self {
275 Self {
276 endpoints: host_endpoints.into(),
277 ..self
278 }
279 }
280
281 pub fn with_connection_timeout(self, connection_timeout: impl Into<Duration>) -> Self {
283 Self {
284 connection_timeout: connection_timeout.into(),
285 ..self
286 }
287 }
288
289 pub fn with_request_timeout(self, request_timeout: impl Into<Duration>) -> Self {
291 Self {
292 request_timeout: request_timeout.into(),
293 ..self
294 }
295 }
296
297 pub fn with_user_agent(self, user_agent: HeaderValue) -> Self {
299 Self { user_agent, ..self }
300 }
301
302 pub fn with_append_retry_policy(
307 self,
308 append_retry_policy: impl Into<AppendRetryPolicy>,
309 ) -> Self {
310 Self {
311 append_retry_policy: append_retry_policy.into(),
312 ..self
313 }
314 }
315
316 pub fn with_max_append_inflight_bytes(self, max_append_inflight_bytes: u64) -> Self {
321 assert!(
322 max_append_inflight_bytes >= MIB_BYTES,
323 "max_append_inflight_bytes must be at least 1MiB"
324 );
325 Self {
326 max_append_inflight_bytes,
327 ..self
328 }
329 }
330
331 #[cfg(feature = "connector")]
333 pub fn with_uri_scheme(self, uri_scheme: impl Into<http::uri::Scheme>) -> Self {
334 Self {
335 uri_scheme: uri_scheme.into(),
336 ..self
337 }
338 }
339
340 pub fn with_retry_backoff_duration(self, retry_backoff_duration: impl Into<Duration>) -> Self {
344 Self {
345 retry_backoff_duration: retry_backoff_duration.into(),
346 ..self
347 }
348 }
349
350 pub fn with_max_attempts(self, max_attempts: usize) -> Self {
354 assert!(max_attempts > 0, "max attempts must be greater than 0");
355 Self {
356 max_attempts,
357 ..self
358 }
359 }
360
361 pub fn with_compression(self, compression: bool) -> Self {
364 Self {
365 compression,
366 ..self
367 }
368 }
369}
370
371#[derive(Debug, Clone, thiserror::Error)]
373pub enum ClientError {
374 #[error(transparent)]
378 Conversion(#[from] types::ConvertError),
379 #[error(transparent)]
381 Service(#[from] tonic::Status),
382}
383
384#[derive(Debug, Clone)]
386pub struct Client {
387 inner: ClientInner,
388}
389
390impl Client {
391 pub fn new(config: ClientConfig) -> Self {
393 Self {
394 inner: ClientInner::new(ClientKind::Account, config, DEFAULT_CONNECTOR),
395 }
396 }
397
398 #[cfg(feature = "connector")]
400 pub fn new_with_connector<C>(config: ClientConfig, connector: C) -> Self
401 where
402 C: tower_service::Service<http::Uri> + Send + 'static,
403 C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
404 C::Future: Send,
405 C::Error: std::error::Error + Send + Sync + 'static,
406 {
407 Self {
408 inner: ClientInner::new(ClientKind::Account, config, Some(connector)),
409 }
410 }
411
412 pub fn basin_client(&self, basin: types::BasinName) -> BasinClient {
414 BasinClient {
415 inner: self.inner.for_basin(basin),
416 }
417 }
418
419 #[sync_docs]
420 pub async fn list_basins(
421 &self,
422 req: types::ListBasinsRequest,
423 ) -> Result<types::ListBasinsResponse, ClientError> {
424 self.inner
425 .send_retryable(ListBasinsServiceRequest::new(
426 self.inner.account_service_client(),
427 req,
428 ))
429 .await
430 }
431
432 #[sync_docs]
433 pub async fn create_basin(
434 &self,
435 req: types::CreateBasinRequest,
436 ) -> Result<types::BasinInfo, ClientError> {
437 self.inner
438 .send_retryable(CreateBasinServiceRequest::new(
439 self.inner.account_service_client(),
440 req,
441 ))
442 .await
443 }
444
445 #[sync_docs]
446 pub async fn delete_basin(&self, req: types::DeleteBasinRequest) -> Result<(), ClientError> {
447 self.inner
448 .send_retryable(DeleteBasinServiceRequest::new(
449 self.inner.account_service_client(),
450 req,
451 ))
452 .await
453 }
454
455 #[sync_docs]
456 pub async fn get_basin_config(
457 &self,
458 basin: types::BasinName,
459 ) -> Result<types::BasinConfig, ClientError> {
460 self.inner
461 .send_retryable(GetBasinConfigServiceRequest::new(
462 self.inner.account_service_client(),
463 basin,
464 ))
465 .await
466 }
467
468 #[sync_docs]
469 pub async fn reconfigure_basin(
470 &self,
471 req: types::ReconfigureBasinRequest,
472 ) -> Result<types::BasinConfig, ClientError> {
473 self.inner
474 .send_retryable(ReconfigureBasinServiceRequest::new(
475 self.inner.account_service_client(),
476 req,
477 ))
478 .await
479 }
480
481 #[sync_docs]
482 pub async fn issue_access_token(
483 &self,
484 info: types::AccessTokenInfo,
485 ) -> Result<String, ClientError> {
486 self.inner
487 .send_retryable(IssueAccessTokenServiceRequest::new(
488 self.inner.account_service_client(),
489 info,
490 ))
491 .await
492 }
493
494 #[sync_docs]
495 pub async fn revoke_access_token(
496 &self,
497 id: types::AccessTokenId,
498 ) -> Result<types::AccessTokenInfo, ClientError> {
499 self.inner
500 .send_retryable(RevokeAccessTokenServiceRequest::new(
501 self.inner.account_service_client(),
502 id,
503 ))
504 .await
505 }
506
507 #[sync_docs]
508 pub async fn list_access_tokens(
509 &self,
510 req: types::ListAccessTokensRequest,
511 ) -> Result<types::ListAccessTokensResponse, ClientError> {
512 self.inner
513 .send_retryable(ListAccessTokensServiceRequest::new(
514 self.inner.account_service_client(),
515 req,
516 ))
517 .await
518 }
519}
520
521#[derive(Debug, Clone)]
523pub struct BasinClient {
524 inner: ClientInner,
525}
526
527impl BasinClient {
528 pub fn new(config: ClientConfig, basin: types::BasinName) -> Self {
530 Self {
531 inner: ClientInner::new(ClientKind::Basin(basin), config, DEFAULT_CONNECTOR),
532 }
533 }
534
535 #[cfg(feature = "connector")]
537 pub fn new_with_connector<C>(
538 config: ClientConfig,
539 basin: types::BasinName,
540 connector: C,
541 ) -> Self
542 where
543 C: tower_service::Service<http::Uri> + Send + 'static,
544 C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
545 C::Future: Send,
546 C::Error: std::error::Error + Send + Sync + 'static,
547 {
548 Self {
549 inner: ClientInner::new(ClientKind::Basin(basin), config, Some(connector)),
550 }
551 }
552
553 pub fn stream_client(&self, stream: impl Into<String>) -> StreamClient {
555 StreamClient {
556 inner: self.inner.clone(),
557 stream: stream.into(),
558 }
559 }
560
561 #[sync_docs]
562 pub async fn create_stream(
563 &self,
564 req: types::CreateStreamRequest,
565 ) -> Result<types::StreamInfo, ClientError> {
566 self.inner
567 .send_retryable(CreateStreamServiceRequest::new(
568 self.inner.basin_service_client(),
569 req,
570 ))
571 .await
572 }
573
574 #[sync_docs]
575 pub async fn list_streams(
576 &self,
577 req: types::ListStreamsRequest,
578 ) -> Result<types::ListStreamsResponse, ClientError> {
579 self.inner
580 .send_retryable(ListStreamsServiceRequest::new(
581 self.inner.basin_service_client(),
582 req,
583 ))
584 .await
585 }
586
587 #[sync_docs]
588 pub async fn get_stream_config(
589 &self,
590 stream: impl Into<String>,
591 ) -> Result<types::StreamConfig, ClientError> {
592 self.inner
593 .send_retryable(GetStreamConfigServiceRequest::new(
594 self.inner.basin_service_client(),
595 stream,
596 ))
597 .await
598 }
599
600 #[sync_docs]
601 pub async fn reconfigure_stream(
602 &self,
603 req: types::ReconfigureStreamRequest,
604 ) -> Result<types::StreamConfig, ClientError> {
605 self.inner
606 .send(ReconfigureStreamServiceRequest::new(
607 self.inner.basin_service_client(),
608 req,
609 ))
610 .await
611 }
612
613 #[sync_docs]
614 pub async fn delete_stream(&self, req: types::DeleteStreamRequest) -> Result<(), ClientError> {
615 self.inner
616 .send_retryable(DeleteStreamServiceRequest::new(
617 self.inner.basin_service_client(),
618 req,
619 ))
620 .await
621 }
622}
623
624#[derive(Debug, Clone)]
626pub struct StreamClient {
627 pub(crate) inner: ClientInner,
628 pub(crate) stream: String,
629}
630
631impl StreamClient {
632 pub fn new(config: ClientConfig, basin: types::BasinName, stream: impl Into<String>) -> Self {
634 BasinClient::new(config, basin).stream_client(stream)
635 }
636
637 #[cfg(feature = "connector")]
639 pub fn new_with_connector<C>(
640 config: ClientConfig,
641 basin: types::BasinName,
642 stream: impl Into<String>,
643 connector: C,
644 ) -> Self
645 where
646 C: tower_service::Service<http::Uri> + Send + 'static,
647 C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
648 C::Future: Send,
649 C::Error: std::error::Error + Send + Sync + 'static,
650 {
651 BasinClient::new_with_connector(config, basin, connector).stream_client(stream)
652 }
653
654 #[sync_docs]
655 pub async fn check_tail(&self) -> Result<u64, ClientError> {
656 self.inner
657 .send_retryable(CheckTailServiceRequest::new(
658 self.inner.stream_service_client(),
659 &self.stream,
660 ))
661 .await
662 }
663
664 #[sync_docs]
665 pub async fn read(&self, req: types::ReadRequest) -> Result<types::ReadOutput, ClientError> {
666 self.inner
667 .send_retryable(ReadServiceRequest::new(
668 self.inner.stream_service_client(),
669 &self.stream,
670 req,
671 self.inner.config.compression,
672 ))
673 .await
674 }
675
676 #[sync_docs]
677 pub async fn read_session(
678 &self,
679 req: types::ReadSessionRequest,
680 ) -> Result<Streaming<types::ReadOutput>, ClientError> {
681 let request = ReadSessionServiceRequest::new(
682 self.inner.stream_service_client(),
683 &self.stream,
684 req,
685 self.inner.config.compression,
686 );
687 self.inner
688 .send_retryable(request.clone())
689 .await
690 .map(|responses| {
691 Box::pin(read_resumption_stream(
692 request,
693 responses,
694 self.inner.clone(),
695 )) as _
696 })
697 }
698
699 #[sync_docs]
700 pub async fn append(
701 &self,
702 req: types::AppendInput,
703 ) -> Result<types::AppendOutput, ClientError> {
704 let frame_signal = FrameSignal::new();
705 self.inner
706 .send_retryable(AppendServiceRequest::new(
707 self.inner
708 .frame_monitoring_stream_service_client(frame_signal.clone()),
709 self.inner.config.append_retry_policy.clone(),
710 frame_signal,
711 &self.stream,
712 req,
713 self.inner.config.compression,
714 ))
715 .await
716 }
717
718 #[sync_docs]
719 #[allow(clippy::unused_async)]
720 pub async fn append_session<S>(
721 &self,
722 req: S,
723 ) -> Result<Streaming<types::AppendOutput>, ClientError>
724 where
725 S: 'static + Send + Unpin + futures::Stream<Item = types::AppendInput>,
726 {
727 let (response_tx, response_rx) = mpsc::channel(10);
728 _ = tokio::spawn(append_session::manage_session(
729 self.clone(),
730 req,
731 response_tx,
732 self.inner.config.compression,
733 ));
734
735 Ok(Box::pin(ReceiverStream::new(response_rx)))
736 }
737}
738
739#[derive(Debug, Clone)]
740enum ClientKind {
741 Account,
742 Basin(types::BasinName),
743}
744
745impl ClientKind {
746 fn to_authority(&self, endpoints: &S2Endpoints) -> Authority {
747 match self {
748 ClientKind::Account => endpoints.account.clone(),
749 ClientKind::Basin(basin) => match &endpoints.basin {
750 BasinEndpoint::ParentZone(zone) => format!("{basin}.{zone}")
751 .try_into()
752 .expect("valid authority as basin pre-validated"),
753 BasinEndpoint::Direct(endpoint) => endpoint.clone(),
754 },
755 }
756 }
757}
758
759#[derive(Debug, Clone)]
760pub(crate) struct ClientInner {
761 kind: ClientKind,
762 channel: Channel,
763 pub(crate) config: ClientConfig,
764}
765
766impl ClientInner {
767 fn new<C>(kind: ClientKind, config: ClientConfig, connector: Option<C>) -> Self
768 where
769 C: tower_service::Service<http::Uri> + Send + 'static,
770 C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
771 C::Future: Send,
772 C::Error: std::error::Error + Send + Sync + 'static,
773 {
774 let authority = kind.to_authority(&config.endpoints);
775
776 #[cfg(not(feature = "connector"))]
777 let scheme = "https";
778 #[cfg(feature = "connector")]
779 let scheme = config.uri_scheme.as_str();
780
781 let endpoint = format!("{scheme}://{authority}")
782 .parse::<Endpoint>()
783 .expect("previously validated endpoint scheme and authority")
784 .user_agent(config.user_agent.clone())
785 .expect("converting HeaderValue into HeaderValue")
786 .http2_adaptive_window(true)
787 .tls_config(
788 ClientTlsConfig::default()
789 .with_webpki_roots()
790 .assume_http2(true),
791 )
792 .expect("valid TLS config")
793 .connect_timeout(config.connection_timeout)
794 .timeout(config.request_timeout);
795
796 let channel = if let Some(connector) = connector {
797 assert!(
798 matches!(&config.endpoints.basin, BasinEndpoint::Direct(a) if a == &config.endpoints.account),
799 "Connector only supported when connecting directly to a cell for account as well as basins"
800 );
801 endpoint.connect_with_connector_lazy(connector)
802 } else {
803 endpoint.connect_lazy()
804 };
805
806 Self {
807 kind,
808 channel,
809 config,
810 }
811 }
812
813 fn for_basin(&self, basin: types::BasinName) -> ClientInner {
814 let current_authority = self.kind.to_authority(&self.config.endpoints);
815 let new_kind = ClientKind::Basin(basin);
816 let new_authority = new_kind.to_authority(&self.config.endpoints);
817 if current_authority == new_authority {
818 Self {
819 kind: new_kind,
820 ..self.clone()
821 }
822 } else {
823 Self::new(new_kind, self.config.clone(), DEFAULT_CONNECTOR)
824 }
825 }
826
827 pub(crate) async fn send<T: ServiceRequest>(
828 &self,
829 service_req: T,
830 ) -> Result<T::Response, ClientError> {
831 let basin_header = match (&self.kind, &self.config.endpoints.basin) {
832 (ClientKind::Basin(basin), BasinEndpoint::Direct(_)) => {
833 Some(AsciiMetadataValue::from_str(basin).expect("valid"))
834 }
835 _ => None,
836 };
837 send_request(service_req, &self.config.token, basin_header).await
838 }
839
840 async fn send_retryable_with_backoff<T: ServiceRequest + Clone>(
841 &self,
842 service_req: T,
843 backoff_builder: impl BackoffBuilder,
844 ) -> Result<T::Response, ClientError> {
845 let retry_fn = || async { self.send(service_req.clone()).await };
846
847 retry_fn
848 .retry(backoff_builder)
849 .when(|e| service_req.should_retry(e))
850 .await
851 }
852
853 pub(crate) async fn send_retryable<T: ServiceRequest + Clone>(
854 &self,
855 service_req: T,
856 ) -> Result<T::Response, ClientError> {
857 self.send_retryable_with_backoff(service_req, self.backoff_builder())
858 .await
859 }
860
861 pub(crate) fn backoff_builder(&self) -> impl BackoffBuilder + use<> {
862 ConstantBuilder::default()
863 .with_delay(self.config.retry_backoff_duration)
864 .with_max_times(self.config.max_attempts)
865 .with_jitter()
866 }
867
868 fn account_service_client(&self) -> AccountServiceClient<Channel> {
869 AccountServiceClient::new(self.channel.clone())
870 }
871
872 fn basin_service_client(&self) -> BasinServiceClient<Channel> {
873 BasinServiceClient::new(self.channel.clone())
874 }
875
876 pub(crate) fn stream_service_client(&self) -> StreamServiceClient<Channel> {
877 StreamServiceClient::new(self.channel.clone())
878 }
879
880 pub(crate) fn frame_monitoring_stream_service_client(
881 &self,
882 frame_signal: FrameSignal,
883 ) -> StreamServiceClient<RequestFrameMonitor> {
884 StreamServiceClient::new(RequestFrameMonitor::new(self.channel.clone(), frame_signal))
885 }
886}
887
888fn read_resumption_stream(
889 mut request: ReadSessionServiceRequest,
890 mut responses: ServiceStreamingResponse<ReadSessionStreamingResponse>,
891 client: ClientInner,
892) -> impl Send + futures::Stream<Item = Result<types::ReadOutput, ClientError>> {
893 let mut backoff = None;
894 async_stream::stream! {
895 while let Some(item) = responses.next().await {
896 match item {
897 Err(e) if request.should_retry(&e) => {
898 if backoff.is_none() {
899 backoff = Some(client.backoff_builder().build());
900 }
901 if let Some(duration) = backoff.as_mut().and_then(|b| b.next()) {
902 sleep(duration).await;
903 if let Ok(new_responses) = client.send_retryable(request.clone()).await {
904 responses = new_responses;
905 } else {
906 yield Err(e);
907 }
908 } else {
909 yield Err(e);
910 }
911 }
912 item => {
913 if item.is_ok() {
914 backoff = None;
915 }
916 if let Ok(types::ReadOutput::Batch(types::SequencedRecordBatch { records })) = &item {
917 let req = request.req_mut();
918 if let Some(record) = records.last() {
919 req.start_seq_num = record.seq_num + 1;
920 }
921 if let Some(count) = req.limit.count.as_mut() {
922 *count = count.saturating_sub(records.len() as u64);
923 }
924 if let Some(bytes) = req.limit.bytes.as_mut() {
925 *bytes = bytes.saturating_sub(records.metered_bytes());
926 }
927 }
928 yield item;
929 }
930 }
931 }
932 }
933}