1use std::{env::VarError, fmt::Display, str::FromStr, time::Duration};
35
36use backon::{BackoffBuilder, ConstantBuilder, Retryable};
37use futures::StreamExt;
38use http::{HeaderValue, uri::Authority};
39use hyper_util::client::legacy::connect::HttpConnector;
40use secrecy::SecretString;
41use sync_docs::sync_docs;
42use tokio::{sync::mpsc, time::sleep};
43use tokio_stream::wrappers::ReceiverStream;
44use tonic::{
45 metadata::AsciiMetadataValue,
46 transport::{Channel, ClientTlsConfig, Endpoint},
47};
48use tonic_side_effect::{FrameSignal, RequestFrameMonitor};
49
50use crate::{
51 api::{
52 account_service_client::AccountServiceClient, basin_service_client::BasinServiceClient,
53 stream_service_client::StreamServiceClient,
54 },
55 append_session,
56 service::{
57 ServiceRequest, ServiceStreamingResponse, Streaming,
58 account::{
59 CreateBasinServiceRequest, DeleteBasinServiceRequest, GetBasinConfigServiceRequest,
60 ListBasinsServiceRequest, ReconfigureBasinServiceRequest,
61 },
62 basin::{
63 CreateStreamServiceRequest, DeleteStreamServiceRequest, GetStreamConfigServiceRequest,
64 ListStreamsServiceRequest, ReconfigureStreamServiceRequest,
65 },
66 send_request,
67 stream::{
68 AppendServiceRequest, CheckTailServiceRequest, ReadServiceRequest,
69 ReadSessionServiceRequest, ReadSessionStreamingResponse,
70 },
71 },
72 types::{self, MIB_BYTES, MeteredBytes},
73};
74
75const DEFAULT_CONNECTOR: Option<HttpConnector> = None;
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum S2Cloud {
80 Aws,
82}
83
84impl S2Cloud {
85 const AWS: &'static str = "aws";
86
87 fn as_str(&self) -> &'static str {
88 match self {
89 Self::Aws => Self::AWS,
90 }
91 }
92}
93
94impl Display for S2Cloud {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 f.write_str(self.as_str())
97 }
98}
99
100impl FromStr for S2Cloud {
101 type Err = String;
102
103 fn from_str(s: &str) -> Result<Self, Self::Err> {
104 if s.eq_ignore_ascii_case(Self::AWS) {
105 Ok(Self::Aws)
106 } else {
107 Err(s.to_owned())
108 }
109 }
110}
111
112#[derive(Debug, Clone)]
114pub enum BasinEndpoint {
115 ParentZone(Authority),
118 Direct(Authority),
122}
123
124#[derive(Debug, Clone)]
130pub struct S2Endpoints {
131 pub account: Authority,
133 pub basin: BasinEndpoint,
135}
136
137#[derive(Debug, Clone)]
139pub enum AppendRetryPolicy {
140 All,
144
145 NoSideEffects,
150}
151
152impl S2Endpoints {
153 pub fn for_cloud(cloud: S2Cloud) -> Self {
155 Self {
156 account: format!("{cloud}.s2.dev")
157 .try_into()
158 .expect("valid authority"),
159 basin: BasinEndpoint::ParentZone(
160 format!("b.{cloud}.s2.dev")
161 .try_into()
162 .expect("valid authority"),
163 ),
164 }
165 }
166
167 pub fn for_cell(
169 cloud: S2Cloud,
170 cell_id: impl Into<String>,
171 ) -> Result<Self, http::uri::InvalidUri> {
172 let cell_endpoint: Authority = format!("{}.o.{cloud}.s2.dev", cell_id.into()).try_into()?;
173 Ok(Self {
174 account: cell_endpoint.clone(),
175 basin: BasinEndpoint::Direct(cell_endpoint),
176 })
177 }
178
179 pub fn from_env() -> Result<Self, String> {
187 let cloud: S2Cloud = std::env::var("S2_CLOUD")
188 .ok()
189 .as_deref()
190 .unwrap_or(S2Cloud::AWS)
191 .parse()
192 .map_err(|cloud| format!("Invalid S2_CLOUD: {cloud}"))?;
193
194 let mut endpoints = Self::for_cloud(cloud);
195
196 match std::env::var("S2_ACCOUNT_ENDPOINT") {
197 Ok(spec) => {
198 endpoints.account = spec
199 .as_str()
200 .try_into()
201 .map_err(|_| format!("Invalid S2_ACCOUNT_ENDPOINT: {spec}"))?;
202 }
203 Err(VarError::NotPresent) => {}
204 Err(VarError::NotUnicode(_)) => {
205 return Err("Invalid S2_ACCOUNT_ENDPOINT: not Unicode".to_owned());
206 }
207 }
208
209 match std::env::var("S2_BASIN_ENDPOINT") {
210 Ok(spec) => {
211 endpoints.basin = if let Some(parent_zone) = spec.strip_prefix("{basin}.") {
212 BasinEndpoint::ParentZone(
213 parent_zone
214 .try_into()
215 .map_err(|e| format!("Invalid S2_BASIN_ENDPOINT ({e}): {spec}"))?,
216 )
217 } else {
218 BasinEndpoint::Direct(
219 spec.as_str()
220 .try_into()
221 .map_err(|e| format!("Invalid S2_BASIN_ENDPOINT ({e}): {spec}"))?,
222 )
223 }
224 }
225 Err(VarError::NotPresent) => {}
226 Err(VarError::NotUnicode(_)) => {
227 return Err("Invalid S2_BASIN_ENDPOINT: not Unicode".to_owned());
228 }
229 }
230
231 Ok(endpoints)
232 }
233}
234
235#[derive(Debug, Clone)]
237pub struct ClientConfig {
238 pub(crate) token: SecretString,
239 pub(crate) endpoints: S2Endpoints,
240 pub(crate) connection_timeout: Duration,
241 pub(crate) request_timeout: Duration,
242 pub(crate) user_agent: HeaderValue,
243 pub(crate) append_retry_policy: AppendRetryPolicy,
244 #[cfg(feature = "connector")]
245 pub(crate) uri_scheme: http::uri::Scheme,
246 pub(crate) retry_backoff_duration: Duration,
247 pub(crate) max_attempts: usize,
248 pub(crate) max_append_inflight_bytes: u64,
249 pub(crate) compression: bool,
250}
251
252impl ClientConfig {
253 pub fn new(token: impl Into<String>) -> Self {
255 Self {
256 token: token.into().into(),
257 endpoints: S2Endpoints::for_cloud(S2Cloud::Aws),
258 connection_timeout: Duration::from_secs(3),
259 request_timeout: Duration::from_secs(5),
260 user_agent: "s2-sdk-rust".parse().expect("valid user agent"),
261 append_retry_policy: AppendRetryPolicy::All,
262 #[cfg(feature = "connector")]
263 uri_scheme: http::uri::Scheme::HTTPS,
264 retry_backoff_duration: Duration::from_millis(100),
265 max_attempts: 3,
266 max_append_inflight_bytes: 100 * MIB_BYTES,
267 compression: false,
268 }
269 }
270
271 pub fn with_endpoints(self, host_endpoints: impl Into<S2Endpoints>) -> Self {
273 Self {
274 endpoints: host_endpoints.into(),
275 ..self
276 }
277 }
278
279 pub fn with_connection_timeout(self, connection_timeout: impl Into<Duration>) -> Self {
281 Self {
282 connection_timeout: connection_timeout.into(),
283 ..self
284 }
285 }
286
287 pub fn with_request_timeout(self, request_timeout: impl Into<Duration>) -> Self {
289 Self {
290 request_timeout: request_timeout.into(),
291 ..self
292 }
293 }
294
295 pub fn with_user_agent(self, user_agent: HeaderValue) -> Self {
297 Self { user_agent, ..self }
298 }
299
300 pub fn with_append_retry_policy(
305 self,
306 append_retry_policy: impl Into<AppendRetryPolicy>,
307 ) -> Self {
308 Self {
309 append_retry_policy: append_retry_policy.into(),
310 ..self
311 }
312 }
313
314 pub fn with_max_append_inflight_bytes(self, max_append_inflight_bytes: u64) -> Self {
319 assert!(
320 max_append_inflight_bytes >= MIB_BYTES,
321 "max_append_inflight_bytes must be at least 1MiB"
322 );
323 Self {
324 max_append_inflight_bytes,
325 ..self
326 }
327 }
328
329 #[cfg(feature = "connector")]
331 pub fn with_uri_scheme(self, uri_scheme: impl Into<http::uri::Scheme>) -> Self {
332 Self {
333 uri_scheme: uri_scheme.into(),
334 ..self
335 }
336 }
337
338 pub fn with_retry_backoff_duration(self, retry_backoff_duration: impl Into<Duration>) -> Self {
342 Self {
343 retry_backoff_duration: retry_backoff_duration.into(),
344 ..self
345 }
346 }
347
348 pub fn with_max_attempts(self, max_attempts: usize) -> Self {
352 assert!(max_attempts > 0, "max attempts must be greater than 0");
353 Self {
354 max_attempts,
355 ..self
356 }
357 }
358
359 pub fn with_compression(self, compression: bool) -> Self {
362 Self {
363 compression,
364 ..self
365 }
366 }
367}
368
369#[derive(Debug, Clone, thiserror::Error)]
371pub enum ClientError {
372 #[error(transparent)]
376 Conversion(#[from] types::ConvertError),
377 #[error(transparent)]
379 Service(#[from] tonic::Status),
380}
381
382#[derive(Debug, Clone)]
384pub struct Client {
385 inner: ClientInner,
386}
387
388impl Client {
389 pub fn new(config: ClientConfig) -> Self {
391 Self {
392 inner: ClientInner::new(ClientKind::Account, config, DEFAULT_CONNECTOR),
393 }
394 }
395
396 #[cfg(feature = "connector")]
398 pub fn new_with_connector<C>(config: ClientConfig, connector: C) -> Self
399 where
400 C: tower_service::Service<http::Uri> + Send + 'static,
401 C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
402 C::Future: Send,
403 C::Error: std::error::Error + Send + Sync + 'static,
404 {
405 Self {
406 inner: ClientInner::new(ClientKind::Account, config, Some(connector)),
407 }
408 }
409
410 pub fn basin_client(&self, basin: types::BasinName) -> BasinClient {
412 BasinClient {
413 inner: self.inner.for_basin(basin),
414 }
415 }
416
417 #[sync_docs]
418 pub async fn list_basins(
419 &self,
420 req: types::ListBasinsRequest,
421 ) -> Result<types::ListBasinsResponse, ClientError> {
422 self.inner
423 .send_retryable(ListBasinsServiceRequest::new(
424 self.inner.account_service_client(),
425 req,
426 ))
427 .await
428 }
429
430 #[sync_docs]
431 pub async fn create_basin(
432 &self,
433 req: types::CreateBasinRequest,
434 ) -> Result<types::BasinInfo, ClientError> {
435 self.inner
436 .send_retryable(CreateBasinServiceRequest::new(
437 self.inner.account_service_client(),
438 req,
439 ))
440 .await
441 }
442
443 #[sync_docs]
444 pub async fn delete_basin(&self, req: types::DeleteBasinRequest) -> Result<(), ClientError> {
445 self.inner
446 .send_retryable(DeleteBasinServiceRequest::new(
447 self.inner.account_service_client(),
448 req,
449 ))
450 .await
451 }
452
453 #[sync_docs]
454 pub async fn get_basin_config(
455 &self,
456 basin: types::BasinName,
457 ) -> Result<types::BasinConfig, ClientError> {
458 self.inner
459 .send_retryable(GetBasinConfigServiceRequest::new(
460 self.inner.account_service_client(),
461 basin,
462 ))
463 .await
464 }
465
466 #[sync_docs]
467 pub async fn reconfigure_basin(
468 &self,
469 req: types::ReconfigureBasinRequest,
470 ) -> Result<types::BasinConfig, ClientError> {
471 self.inner
472 .send_retryable(ReconfigureBasinServiceRequest::new(
473 self.inner.account_service_client(),
474 req,
475 ))
476 .await
477 }
478}
479
480#[derive(Debug, Clone)]
482pub struct BasinClient {
483 inner: ClientInner,
484}
485
486impl BasinClient {
487 pub fn new(config: ClientConfig, basin: types::BasinName) -> Self {
489 Self {
490 inner: ClientInner::new(ClientKind::Basin(basin), config, DEFAULT_CONNECTOR),
491 }
492 }
493
494 #[cfg(feature = "connector")]
496 pub fn new_with_connector<C>(
497 config: ClientConfig,
498 basin: types::BasinName,
499 connector: C,
500 ) -> Self
501 where
502 C: tower_service::Service<http::Uri> + Send + 'static,
503 C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
504 C::Future: Send,
505 C::Error: std::error::Error + Send + Sync + 'static,
506 {
507 Self {
508 inner: ClientInner::new(ClientKind::Basin(basin), config, Some(connector)),
509 }
510 }
511
512 pub fn stream_client(&self, stream: impl Into<String>) -> StreamClient {
514 StreamClient {
515 inner: self.inner.clone(),
516 stream: stream.into(),
517 }
518 }
519
520 #[sync_docs]
521 pub async fn create_stream(
522 &self,
523 req: types::CreateStreamRequest,
524 ) -> Result<types::StreamInfo, ClientError> {
525 self.inner
526 .send_retryable(CreateStreamServiceRequest::new(
527 self.inner.basin_service_client(),
528 req,
529 ))
530 .await
531 }
532
533 #[sync_docs]
534 pub async fn list_streams(
535 &self,
536 req: types::ListStreamsRequest,
537 ) -> Result<types::ListStreamsResponse, ClientError> {
538 self.inner
539 .send_retryable(ListStreamsServiceRequest::new(
540 self.inner.basin_service_client(),
541 req,
542 ))
543 .await
544 }
545
546 #[sync_docs]
547 pub async fn get_stream_config(
548 &self,
549 stream: impl Into<String>,
550 ) -> Result<types::StreamConfig, ClientError> {
551 self.inner
552 .send_retryable(GetStreamConfigServiceRequest::new(
553 self.inner.basin_service_client(),
554 stream,
555 ))
556 .await
557 }
558
559 #[sync_docs]
560 pub async fn reconfigure_stream(
561 &self,
562 req: types::ReconfigureStreamRequest,
563 ) -> Result<types::StreamConfig, ClientError> {
564 self.inner
565 .send(ReconfigureStreamServiceRequest::new(
566 self.inner.basin_service_client(),
567 req,
568 ))
569 .await
570 }
571
572 #[sync_docs]
573 pub async fn delete_stream(&self, req: types::DeleteStreamRequest) -> Result<(), ClientError> {
574 self.inner
575 .send_retryable(DeleteStreamServiceRequest::new(
576 self.inner.basin_service_client(),
577 req,
578 ))
579 .await
580 }
581}
582
583#[derive(Debug, Clone)]
585pub struct StreamClient {
586 pub(crate) inner: ClientInner,
587 pub(crate) stream: String,
588}
589
590impl StreamClient {
591 pub fn new(config: ClientConfig, basin: types::BasinName, stream: impl Into<String>) -> Self {
593 BasinClient::new(config, basin).stream_client(stream)
594 }
595
596 #[cfg(feature = "connector")]
598 pub fn new_with_connector<C>(
599 config: ClientConfig,
600 basin: types::BasinName,
601 stream: impl Into<String>,
602 connector: C,
603 ) -> Self
604 where
605 C: tower_service::Service<http::Uri> + Send + 'static,
606 C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
607 C::Future: Send,
608 C::Error: std::error::Error + Send + Sync + 'static,
609 {
610 BasinClient::new_with_connector(config, basin, connector).stream_client(stream)
611 }
612
613 #[sync_docs]
614 pub async fn check_tail(&self) -> Result<u64, ClientError> {
615 self.inner
616 .send_retryable(CheckTailServiceRequest::new(
617 self.inner.stream_service_client(),
618 &self.stream,
619 ))
620 .await
621 }
622
623 #[sync_docs]
624 pub async fn read(&self, req: types::ReadRequest) -> Result<types::ReadOutput, ClientError> {
625 self.inner
626 .send_retryable(ReadServiceRequest::new(
627 self.inner.stream_service_client(),
628 &self.stream,
629 req,
630 self.inner.config.compression,
631 ))
632 .await
633 }
634
635 #[sync_docs]
636 pub async fn read_session(
637 &self,
638 req: types::ReadSessionRequest,
639 ) -> Result<Streaming<types::ReadOutput>, ClientError> {
640 let request = ReadSessionServiceRequest::new(
641 self.inner.stream_service_client(),
642 &self.stream,
643 req,
644 self.inner.config.compression,
645 );
646 self.inner
647 .send_retryable(request.clone())
648 .await
649 .map(|responses| {
650 Box::pin(read_resumption_stream(
651 request,
652 responses,
653 self.inner.clone(),
654 )) as _
655 })
656 }
657
658 #[sync_docs]
659 pub async fn append(
660 &self,
661 req: types::AppendInput,
662 ) -> Result<types::AppendOutput, ClientError> {
663 let frame_signal = FrameSignal::new();
664 self.inner
665 .send_retryable(AppendServiceRequest::new(
666 self.inner
667 .frame_monitoring_stream_service_client(frame_signal.clone()),
668 self.inner.config.append_retry_policy.clone(),
669 frame_signal,
670 &self.stream,
671 req,
672 self.inner.config.compression,
673 ))
674 .await
675 }
676
677 #[sync_docs]
678 #[allow(clippy::unused_async)]
679 pub async fn append_session<S>(
680 &self,
681 req: S,
682 ) -> Result<Streaming<types::AppendOutput>, ClientError>
683 where
684 S: 'static + Send + Unpin + futures::Stream<Item = types::AppendInput>,
685 {
686 let (response_tx, response_rx) = mpsc::channel(10);
687 _ = tokio::spawn(append_session::manage_session(
688 self.clone(),
689 req,
690 response_tx,
691 self.inner.config.compression,
692 ));
693
694 Ok(Box::pin(ReceiverStream::new(response_rx)))
695 }
696}
697
698#[derive(Debug, Clone)]
699enum ClientKind {
700 Account,
701 Basin(types::BasinName),
702}
703
704impl ClientKind {
705 fn to_authority(&self, endpoints: &S2Endpoints) -> Authority {
706 match self {
707 ClientKind::Account => endpoints.account.clone(),
708 ClientKind::Basin(basin) => match &endpoints.basin {
709 BasinEndpoint::ParentZone(zone) => format!("{basin}.{zone}")
710 .try_into()
711 .expect("valid authority as basin pre-validated"),
712 BasinEndpoint::Direct(endpoint) => endpoint.clone(),
713 },
714 }
715 }
716}
717
718#[derive(Debug, Clone)]
719pub(crate) struct ClientInner {
720 kind: ClientKind,
721 channel: Channel,
722 pub(crate) config: ClientConfig,
723}
724
725impl ClientInner {
726 fn new<C>(kind: ClientKind, config: ClientConfig, connector: Option<C>) -> Self
727 where
728 C: tower_service::Service<http::Uri> + Send + 'static,
729 C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
730 C::Future: Send,
731 C::Error: std::error::Error + Send + Sync + 'static,
732 {
733 let authority = kind.to_authority(&config.endpoints);
734
735 #[cfg(not(feature = "connector"))]
736 let scheme = "https";
737 #[cfg(feature = "connector")]
738 let scheme = config.uri_scheme.as_str();
739
740 let endpoint = format!("{scheme}://{authority}")
741 .parse::<Endpoint>()
742 .expect("previously validated endpoint scheme and authority")
743 .user_agent(config.user_agent.clone())
744 .expect("converting HeaderValue into HeaderValue")
745 .http2_adaptive_window(true)
746 .tls_config(
747 ClientTlsConfig::default()
748 .with_webpki_roots()
749 .assume_http2(true),
750 )
751 .expect("valid TLS config")
752 .connect_timeout(config.connection_timeout)
753 .timeout(config.request_timeout);
754
755 let channel = if let Some(connector) = connector {
756 assert!(
757 matches!(&config.endpoints.basin, BasinEndpoint::Direct(a) if a == &config.endpoints.account),
758 "Connector only supported when connecting directly to a cell for account as well as basins"
759 );
760 endpoint.connect_with_connector_lazy(connector)
761 } else {
762 endpoint.connect_lazy()
763 };
764
765 Self {
766 kind,
767 channel,
768 config,
769 }
770 }
771
772 fn for_basin(&self, basin: types::BasinName) -> ClientInner {
773 let current_authority = self.kind.to_authority(&self.config.endpoints);
774 let new_kind = ClientKind::Basin(basin);
775 let new_authority = new_kind.to_authority(&self.config.endpoints);
776 if current_authority == new_authority {
777 Self {
778 kind: new_kind,
779 ..self.clone()
780 }
781 } else {
782 Self::new(new_kind, self.config.clone(), DEFAULT_CONNECTOR)
783 }
784 }
785
786 pub(crate) async fn send<T: ServiceRequest>(
787 &self,
788 service_req: T,
789 ) -> Result<T::Response, ClientError> {
790 let basin_header = match (&self.kind, &self.config.endpoints.basin) {
791 (ClientKind::Basin(basin), BasinEndpoint::Direct(_)) => {
792 Some(AsciiMetadataValue::from_str(basin).expect("valid"))
793 }
794 _ => None,
795 };
796 send_request(service_req, &self.config.token, basin_header).await
797 }
798
799 async fn send_retryable_with_backoff<T: ServiceRequest + Clone>(
800 &self,
801 service_req: T,
802 backoff_builder: impl BackoffBuilder,
803 ) -> Result<T::Response, ClientError> {
804 let retry_fn = || async { self.send(service_req.clone()).await };
805
806 retry_fn
807 .retry(backoff_builder)
808 .when(|e| service_req.should_retry(e))
809 .await
810 }
811
812 pub(crate) async fn send_retryable<T: ServiceRequest + Clone>(
813 &self,
814 service_req: T,
815 ) -> Result<T::Response, ClientError> {
816 self.send_retryable_with_backoff(service_req, self.backoff_builder())
817 .await
818 }
819
820 pub(crate) fn backoff_builder(&self) -> impl BackoffBuilder + use<> {
821 ConstantBuilder::default()
822 .with_delay(self.config.retry_backoff_duration)
823 .with_max_times(self.config.max_attempts)
824 .with_jitter()
825 }
826
827 fn account_service_client(&self) -> AccountServiceClient<Channel> {
828 AccountServiceClient::new(self.channel.clone())
829 }
830
831 fn basin_service_client(&self) -> BasinServiceClient<Channel> {
832 BasinServiceClient::new(self.channel.clone())
833 }
834
835 pub(crate) fn stream_service_client(&self) -> StreamServiceClient<Channel> {
836 StreamServiceClient::new(self.channel.clone())
837 }
838
839 pub(crate) fn frame_monitoring_stream_service_client(
840 &self,
841 frame_signal: FrameSignal,
842 ) -> StreamServiceClient<RequestFrameMonitor> {
843 StreamServiceClient::new(RequestFrameMonitor::new(self.channel.clone(), frame_signal))
844 }
845}
846
847fn read_resumption_stream(
848 mut request: ReadSessionServiceRequest,
849 mut responses: ServiceStreamingResponse<ReadSessionStreamingResponse>,
850 client: ClientInner,
851) -> impl Send + futures::Stream<Item = Result<types::ReadOutput, ClientError>> {
852 let mut backoff = None;
853 async_stream::stream! {
854 while let Some(item) = responses.next().await {
855 match item {
856 Err(e) if request.should_retry(&e) => {
857 if backoff.is_none() {
858 backoff = Some(client.backoff_builder().build());
859 }
860 if let Some(duration) = backoff.as_mut().and_then(|b| b.next()) {
861 sleep(duration).await;
862 if let Ok(new_responses) = client.send_retryable(request.clone()).await {
863 responses = new_responses;
864 } else {
865 yield Err(e);
866 }
867 } else {
868 yield Err(e);
869 }
870 }
871 item => {
872 if item.is_ok() {
873 backoff = None;
874 }
875 if let Ok(types::ReadOutput::Batch(types::SequencedRecordBatch { records })) = &item {
876 let req = request.req_mut();
877 if let Some(record) = records.last() {
878 req.start_seq_num = record.seq_num + 1;
879 }
880 if let Some(count) = req.limit.count.as_mut() {
881 *count = count.saturating_sub(records.len() as u64);
882 }
883 if let Some(bytes) = req.limit.bytes.as_mut() {
884 *bytes = bytes.saturating_sub(records.metered_bytes());
885 }
886 }
887 yield item;
888 }
889 }
890 }
891 }
892}