Skip to main content

tansu_storage/
service.rs

1// Copyright ⓒ 2024-2026 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter_user_scram_credentials;
16mod consumer_group_describe;
17mod create_acls;
18mod create_topics;
19mod delete_groups;
20mod delete_records;
21mod delete_topics;
22mod describe_acls;
23mod describe_cluster;
24mod describe_configs;
25mod describe_groups;
26mod describe_topic_partitions;
27mod describe_user_scram_credentials;
28mod fetch;
29mod find_coordinator;
30mod get_telemetry_subscriptions;
31mod incremental_alter_configs;
32mod init_producer_id;
33mod list_groups;
34mod list_offsets;
35mod list_partition_reassignments;
36mod metadata;
37mod produce;
38mod txn;
39
40use std::{
41    collections::BTreeMap,
42    fmt::{self, Debug, Display, Formatter},
43    sync::LazyLock,
44    time::{Duration, SystemTime},
45};
46
47pub use alter_user_scram_credentials::AlterUserScramCredentialsService;
48use async_trait::async_trait;
49pub use consumer_group_describe::ConsumerGroupDescribeService;
50pub use create_acls::CreateAclsService;
51pub use create_topics::CreateTopicsService;
52pub use delete_groups::DeleteGroupsService;
53pub use delete_records::DeleteRecordsService;
54pub use delete_topics::DeleteTopicsService;
55pub use describe_acls::DescribeAclsService;
56pub use describe_cluster::DescribeClusterService;
57pub use describe_configs::DescribeConfigsService;
58pub use describe_groups::DescribeGroupsService;
59pub use describe_topic_partitions::DescribeTopicPartitionsService;
60pub use describe_user_scram_credentials::DescribeUserScramCredentialsService;
61pub use fetch::FetchService;
62pub use find_coordinator::FindCoordinatorService;
63pub use get_telemetry_subscriptions::GetTelemetrySubscriptionsService;
64pub use incremental_alter_configs::IncrementalAlterConfigsService;
65pub use init_producer_id::InitProducerIdService;
66pub use list_groups::ListGroupsService;
67pub use list_offsets::ListOffsetsService;
68pub use list_partition_reassignments::ListPartitionReassignmentsService;
69pub use metadata::MetadataService;
70use opentelemetry::{
71    KeyValue,
72    metrics::{Counter, Gauge, Histogram},
73};
74pub use produce::ProduceService;
75use rama::{Context, Layer, Service};
76use tansu_sans_io::{
77    ConfigResource, ErrorCode, IsolationLevel, ListOffset, ScramMechanism,
78    create_topics_request::CreatableTopic, delete_groups_response::DeletableGroupResult,
79    delete_records_request::DeleteRecordsTopic, delete_records_response::DeleteRecordsTopicResult,
80    describe_cluster_response::DescribeClusterBroker,
81    describe_configs_response::DescribeConfigsResult,
82    describe_topic_partitions_response::DescribeTopicPartitionsResponseTopic,
83    incremental_alter_configs_request::AlterConfigsResource,
84    incremental_alter_configs_response::AlterConfigsResourceResponse,
85    list_groups_response::ListedGroup, record::deflated,
86    txn_offset_commit_response::TxnOffsetCommitResponseTopic,
87};
88use tokio::sync::{
89    mpsc::{self, error::SendError},
90    oneshot,
91};
92use tokio_util::sync::CancellationToken;
93use tracing::{debug, error, instrument};
94pub use txn::add_offsets::AddOffsetsService as TxnAddOffsetsService;
95pub use txn::add_partitions::AddPartitionService as TxnAddPartitionService;
96pub use txn::offset_commit::OffsetCommitService as TxnOffsetCommitService;
97use url::Url;
98use uuid::Uuid;
99
100use crate::{
101    BrokerRegistrationRequest, Error, GroupDetail, ListOffsetResponse, METER, MetadataResponse,
102    NamedGroupDetail, OffsetCommitRequest, OffsetStage, ProducerIdResponse, Result,
103    ScramCredential, Storage, TopicId, Topition, TxnAddPartitionsRequest, TxnAddPartitionsResponse,
104    TxnOffsetCommitRequest, UpdateError, Version,
105};
106
107#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
108pub enum Request {
109    RegisterBroker(BrokerRegistrationRequest),
110    IncrementalAlterResource(AlterConfigsResource),
111    CreateTopic {
112        topic: CreatableTopic,
113        validate_only: bool,
114    },
115    DeleteRecords(Vec<DeleteRecordsTopic>),
116    DeleteTopic(TopicId),
117    Brokers,
118    Produce {
119        transaction_id: Option<String>,
120        topition: Topition,
121        batch: deflated::Batch,
122    },
123    Fetch {
124        topition: Topition,
125        offset: i64,
126        min_bytes: u32,
127        max_bytes: u32,
128        isolation: IsolationLevel,
129    },
130    OffsetStage(Topition),
131    ListOffsets {
132        isolation_level: IsolationLevel,
133        offsets: Vec<(Topition, ListOffset)>,
134    },
135    OffsetCommit {
136        group_id: String,
137        retention_time_ms: Option<Duration>,
138        offsets: Vec<(Topition, OffsetCommitRequest)>,
139    },
140    CommittedOffsetTopitions(String),
141    OffsetFetch {
142        group_id: Option<String>,
143        topics: Vec<Topition>,
144        require_stable: Option<bool>,
145    },
146    Metadata(Option<Vec<TopicId>>),
147    DescribeConfig {
148        name: String,
149        resource: ConfigResource,
150        keys: Option<Vec<String>>,
151    },
152    DescribeTopicPartitions {
153        topics: Option<Vec<TopicId>>,
154        partition_limit: i32,
155        cursor: Option<Topition>,
156    },
157    ListGroups(Option<Vec<String>>),
158    DeleteGroups(Option<Vec<String>>),
159    DescribeGroups {
160        group_ids: Option<Vec<String>>,
161        include_authorized_operations: bool,
162    },
163    UpdateGroup {
164        group_id: String,
165        detail: GroupDetail,
166        version: Option<Version>,
167    },
168    InitProducer {
169        transaction_id: Option<String>,
170        transaction_timeout_ms: i32,
171        producer_id: Option<i64>,
172        producer_epoch: Option<i16>,
173    },
174    TxnAddOffsets {
175        transaction_id: String,
176        producer_id: i64,
177        producer_epoch: i16,
178        group_id: String,
179    },
180    TxnAddPartitions(TxnAddPartitionsRequest),
181    TxnOffsetCommit(TxnOffsetCommitRequest),
182    TxnEnd {
183        transaction_id: String,
184        producer_id: i64,
185        producer_epoch: i16,
186        committed: bool,
187    },
188    Maintain(SystemTime),
189    ClusterId,
190    Node,
191    AdvertisedListener,
192    DeleteUserScramCredential {
193        user: String,
194        mechanism: ScramMechanism,
195    },
196    UpsertUserScramCredential {
197        user: String,
198        mechanism: ScramMechanism,
199        credential: ScramCredential,
200    },
201    UserScramCredential {
202        user: String,
203        mechanism: ScramMechanism,
204    },
205    Ping,
206}
207
208impl Display for Request {
209    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
210        match self {
211            Self::AdvertisedListener => f.write_str("AdvertisedListener"),
212            Self::Brokers => f.write_str("Brokers"),
213            Self::ClusterId => f.write_str("ClusterId"),
214            Self::CommittedOffsetTopitions(_) => f.write_str("CommittedOffsetTopitions"),
215            Self::CreateTopic { .. } => f.write_str("CreateTopic"),
216            Self::DeleteGroups(_) => f.write_str("DeleteGroups"),
217            Self::DeleteRecords(_) => f.write_str("DeleteRecords"),
218            Self::DeleteTopic(_) => f.write_str("DeleteTopic"),
219            Self::DescribeConfig { .. } => f.write_str("DescribeConfig"),
220            Self::DescribeGroups { .. } => f.write_str("DescribeGroups"),
221            Self::DescribeTopicPartitions { .. } => f.write_str("DescribeTopicPartitions"),
222            Self::Fetch { .. } => f.write_str("Fetch"),
223            Self::IncrementalAlterResource(_) => f.write_str("IncrementalAlterResource"),
224            Self::InitProducer { .. } => f.write_str("InitProducer"),
225            Self::ListGroups(_) => f.write_str("ListGroups"),
226            Self::ListOffsets { .. } => f.write_str("ListOffsets"),
227            Self::Maintain(_) => f.write_str("Maintain"),
228            Self::Metadata(_) => f.write_str("Metadata"),
229            Self::Node => f.write_str("Node"),
230            Self::OffsetCommit { .. } => f.write_str("OffsetCommit"),
231            Self::OffsetFetch { .. } => f.write_str("OffsetFetch"),
232            Self::OffsetStage(_) => f.write_str("OffsetStage"),
233            Self::Produce { .. } => f.write_str("Produce"),
234            Self::RegisterBroker(_) => f.write_str("RegisterBroker"),
235            Self::TxnAddOffsets { .. } => f.write_str("TxnAddOffsets"),
236            Self::TxnAddPartitions(_) => f.write_str("TxnAddPartitions"),
237            Self::TxnEnd { .. } => f.write_str("TxnEnd"),
238            Self::TxnOffsetCommit(_) => f.write_str("TxnOffsetCommit"),
239            Self::UpdateGroup { .. } => f.write_str("UpdateGroup"),
240            Self::DeleteUserScramCredential { .. } => f.write_str("DeleteUserScramCredential"),
241            Self::UpsertUserScramCredential { .. } => f.write_str("UpsertUserScramCredential"),
242            Self::UserScramCredential { .. } => f.write_str("UserScramCredential"),
243            Self::Ping => f.write_str("Ping"),
244        }
245    }
246}
247
248#[derive(Clone, Debug)]
249pub enum Response {
250    AdvertisedListener(Result<Url>),
251    Brokers(Result<Vec<DescribeClusterBroker>>),
252    ClusterId(Result<String>),
253    CommittedOffsetTopitions(Result<BTreeMap<Topition, i64>>),
254    CreateTopic(Result<Uuid>),
255    DeleteGroups(Result<Vec<DeletableGroupResult>>),
256    DeleteRecords(Result<Vec<DeleteRecordsTopicResult>>),
257    DeleteTopic(Result<ErrorCode>),
258    DeleteUserScramCredential(Result<()>),
259    DescribeConfig(Result<DescribeConfigsResult>),
260    DescribeGroups(Result<Vec<NamedGroupDetail>>),
261    DescribeTopicPartitions(Result<Vec<DescribeTopicPartitionsResponseTopic>>),
262    Fetch(Result<Vec<deflated::Batch>>),
263    IncrementalAlterResponse(Result<AlterConfigsResourceResponse>),
264    InitProducer(Result<ProducerIdResponse>),
265    ListGroups(Result<Vec<ListedGroup>>),
266    ListOffsets(Result<Vec<(Topition, ListOffsetResponse)>>),
267    Maintain(Result<()>),
268    Metadata(Result<MetadataResponse>),
269    Node(Result<i32>),
270    OffsetCommit(Result<Vec<(Topition, ErrorCode)>>),
271    OffsetFetch(Result<BTreeMap<Topition, i64>>),
272    OffsetStage(Result<OffsetStage>),
273    Ping(Result<()>),
274    Produce(Result<i64>),
275    RegisterBroker(Result<()>),
276    TxnAddOffsets(Result<ErrorCode>),
277    TxnAddPartitions(Result<TxnAddPartitionsResponse>),
278    TxnEnd(Result<ErrorCode>),
279    TxnOffsetCommit(Result<Vec<TxnOffsetCommitResponseTopic>>),
280    UpdateGroup(Result<Version, UpdateError<GroupDetail>>),
281    UpsertUserScramCredential(Result<()>),
282    UserScramCredential(Result<Option<ScramCredential>>),
283}
284
285pub type RequestSender = mpsc::Sender<(Request, oneshot::Sender<Response>)>;
286pub type RequestReceiver = mpsc::Receiver<(Request, oneshot::Sender<Response>)>;
287
288pub fn bounded_channel(buffer: usize) -> (RequestSender, RequestReceiver) {
289    mpsc::channel::<(Request, oneshot::Sender<Response>)>(buffer)
290}
291
292#[derive(Clone, Debug, thiserror::Error)]
293pub enum ServiceError {
294    Storage(Error),
295    UpdateGroupDetail(UpdateError<GroupDetail>),
296}
297
298impl Display for ServiceError {
299    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
300        write!(f, "{self:?}")
301    }
302}
303
304impl From<SendError<()>> for ServiceError {
305    fn from(_value: SendError<()>) -> Self {
306        Self::Storage(Error::UnableToSend)
307    }
308}
309
310impl From<Error> for ServiceError {
311    fn from(value: Error) -> Self {
312        Self::Storage(value)
313    }
314}
315
316impl From<UpdateError<GroupDetail>> for ServiceError {
317    fn from(value: UpdateError<GroupDetail>) -> Self {
318        Self::UpdateGroupDetail(value)
319    }
320}
321
322impl From<ServiceError> for Error {
323    fn from(value: ServiceError) -> Self {
324        if let ServiceError::Storage(error) = value {
325            error
326        } else {
327            unreachable!()
328        }
329    }
330}
331
332impl From<ServiceError> for UpdateError<GroupDetail> {
333    fn from(value: ServiceError) -> Self {
334        if let ServiceError::UpdateGroupDetail(error) = value {
335            error
336        } else {
337            unreachable!()
338        }
339    }
340}
341
342#[derive(Copy, Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
343pub struct RequestLayer;
344
345impl<S> Layer<S> for RequestLayer {
346    type Service = RequestService<S>;
347
348    fn layer(&self, inner: S) -> Self::Service {
349        Self::Service { inner }
350    }
351}
352
353#[derive(Copy, Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
354pub struct RequestService<S> {
355    inner: S,
356}
357
358impl<State, S> Service<State, Request> for RequestService<S>
359where
360    S: Service<State, Request>,
361    State: Send + Sync + 'static,
362{
363    type Response = S::Response;
364    type Error = S::Error;
365
366    async fn serve(
367        &self,
368        ctx: Context<State>,
369        req: Request,
370    ) -> Result<Self::Response, Self::Error> {
371        debug!(?req);
372        self.inner.serve(ctx, req).await
373    }
374}
375
376/// A [`Service`] sending [`Request`]s over a [`RequestSender`] channel
377#[derive(Clone, Debug)]
378pub struct RequestChannelService {
379    tx: RequestSender,
380}
381
382impl RequestChannelService {
383    pub fn new(tx: RequestSender) -> Self {
384        Self { tx }
385    }
386
387    fn elapsed_millis(&self, start: SystemTime) -> u64 {
388        start
389            .elapsed()
390            .map_or(0, |duration| duration.as_millis() as u64)
391    }
392}
393
394static STORAGE_CHANNEL_CAPACITY: LazyLock<Gauge<u64>> = LazyLock::new(|| {
395    METER
396        .u64_gauge("tansu_storage_channel_capacity")
397        .with_description("Storage channel capacity")
398        .build()
399});
400
401impl<State> Service<State, Request> for RequestChannelService
402where
403    State: Send + Sync + 'static,
404{
405    type Response = Response;
406    type Error = ServiceError;
407
408    #[instrument(skip_all)]
409    async fn serve(
410        &self,
411        ctx: Context<State>,
412        req: Request,
413    ) -> Result<Self::Response, Self::Error> {
414        let _ = ctx;
415        let (resp_tx, resp_rx) = oneshot::channel();
416
417        let start = SystemTime::now();
418
419        let operation = req.to_string();
420        let attributes = [KeyValue::new("operation", operation.clone())];
421
422        let capacity = self.tx.capacity();
423        STORAGE_CHANNEL_CAPACITY.record(capacity as u64, &attributes);
424        debug!(operation, capacity);
425
426        self.tx
427            .reserve()
428            .await
429            .map(|permit| permit.send((req, resp_tx)))
430            .inspect(|_| {
431                let permit_elapsed = self.elapsed_millis(start);
432                STORAGE_CHANNEL_PERMIT_DURATION.record(permit_elapsed, &attributes);
433                debug!(operation, permit_elapsed);
434            })
435            .inspect_err(|err| {
436                error!(operation, ?err);
437                STORAGE_CHANNEL_ERROR.add(1, &attributes);
438            })?;
439
440        resp_rx
441            .await
442            .map_err(|_| Error::OneshotRecv.into())
443            .inspect(|_| {
444                let elapsed_millis = self.elapsed_millis(start);
445                STORAGE_CHANNEL_REQUEST_DURATION.record(elapsed_millis, &attributes);
446                debug!(operation, elapsed_millis);
447            })
448            .inspect_err(|err| {
449                error!(operation, ?err);
450                STORAGE_CHANNEL_ERROR.add(1, &attributes);
451            })
452    }
453}
454
455static STORAGE_CHANNEL_REQUEST_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
456    METER
457        .u64_histogram("tansu_storage_channel_request_duration")
458        .with_unit("ms")
459        .with_description("Storage channel request latency in milliseconds")
460        .build()
461});
462
463static STORAGE_CHANNEL_PERMIT_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
464    METER
465        .u64_histogram("tansu_storage_channel_permit_duration")
466        .with_unit("ms")
467        .with_description("Storage channel permit latency in milliseconds")
468        .build()
469});
470
471static STORAGE_CHANNEL_ERROR: LazyLock<Counter<u64>> = LazyLock::new(|| {
472    METER
473        .u64_counter("tansu_storage_channel_error")
474        .with_description("Storage channel error count")
475        .build()
476});
477
478#[async_trait]
479impl Storage for RequestChannelService {
480    #[instrument(skip_all)]
481    async fn register_broker(&self, broker_registration: BrokerRegistrationRequest) -> Result<()> {
482        self.serve(
483            Context::default(),
484            Request::RegisterBroker(broker_registration),
485        )
486        .await
487        .and_then(|response| {
488            if let Response::RegisterBroker(inner) = response {
489                inner.map_err(Into::into)
490            } else {
491                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
492            }
493        })
494        .map_err(Into::into)
495    }
496
497    #[instrument(skip_all)]
498    async fn incremental_alter_resource(
499        &self,
500        resource: AlterConfigsResource,
501    ) -> Result<AlterConfigsResourceResponse> {
502        self.serve(
503            Context::default(),
504            Request::IncrementalAlterResource(resource),
505        )
506        .await
507        .and_then(|response| {
508            if let Response::IncrementalAlterResponse(inner) = response {
509                inner.map_err(Into::into)
510            } else {
511                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
512            }
513        })
514        .map_err(Into::into)
515    }
516
517    #[instrument(skip_all)]
518    async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result<Uuid> {
519        self.serve(
520            Context::default(),
521            Request::CreateTopic {
522                topic,
523                validate_only,
524            },
525        )
526        .await
527        .and_then(|response| {
528            if let Response::CreateTopic(inner) = response {
529                inner.map_err(Into::into)
530            } else {
531                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
532            }
533        })
534        .map_err(Into::into)
535    }
536
537    #[instrument(skip_all)]
538    async fn delete_records(
539        &self,
540        topics: &[DeleteRecordsTopic],
541    ) -> Result<Vec<DeleteRecordsTopicResult>> {
542        self.serve(
543            Context::default(),
544            Request::DeleteRecords(Vec::from(topics)),
545        )
546        .await
547        .and_then(|response| {
548            if let Response::DeleteRecords(inner) = response {
549                inner.map_err(Into::into)
550            } else {
551                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
552            }
553        })
554        .map_err(Into::into)
555    }
556
557    #[instrument(skip_all)]
558    async fn delete_topic(&self, topic: &TopicId) -> Result<ErrorCode> {
559        self.serve(Context::default(), Request::DeleteTopic(topic.to_owned()))
560            .await
561            .and_then(|response| {
562                if let Response::DeleteTopic(inner) = response {
563                    inner.map_err(Into::into)
564                } else {
565                    Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
566                }
567            })
568            .map_err(Into::into)
569    }
570
571    #[instrument(skip_all)]
572    async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>> {
573        self.serve(Context::default(), Request::Brokers)
574            .await
575            .and_then(|response| {
576                if let Response::Brokers(inner) = response {
577                    inner.map_err(Into::into)
578                } else {
579                    Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
580                }
581            })
582            .map_err(Into::into)
583    }
584
585    #[instrument(skip_all)]
586    async fn produce(
587        &self,
588        transaction_id: Option<&str>,
589        topition: &Topition,
590        batch: deflated::Batch,
591    ) -> Result<i64> {
592        let transaction_id = transaction_id.map(|s| s.to_string());
593        let topition = topition.to_owned();
594
595        self.serve(
596            Context::default(),
597            Request::Produce {
598                transaction_id,
599                topition,
600                batch,
601            },
602        )
603        .await
604        .and_then(|response| {
605            if let Response::Produce(inner) = response {
606                inner.map_err(Into::into)
607            } else {
608                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
609            }
610        })
611        .map_err(Into::into)
612    }
613
614    #[instrument(skip_all)]
615    async fn fetch(
616        &self,
617        topition: &'_ Topition,
618        offset: i64,
619        min_bytes: u32,
620        max_bytes: u32,
621        isolation: IsolationLevel,
622    ) -> Result<Vec<deflated::Batch>> {
623        let topition = topition.to_owned();
624
625        self.serve(
626            Context::default(),
627            Request::Fetch {
628                topition,
629                offset,
630                min_bytes,
631                max_bytes,
632                isolation,
633            },
634        )
635        .await
636        .and_then(|response| {
637            if let Response::Fetch(inner) = response {
638                inner.map_err(Into::into)
639            } else {
640                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
641            }
642        })
643        .map_err(Into::into)
644    }
645
646    #[instrument(skip_all)]
647    async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage> {
648        self.serve(
649            Context::default(),
650            Request::OffsetStage(topition.to_owned()),
651        )
652        .await
653        .and_then(|response| {
654            if let Response::OffsetStage(inner) = response {
655                inner.map_err(Into::into)
656            } else {
657                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
658            }
659        })
660        .map_err(Into::into)
661    }
662
663    #[instrument(skip_all)]
664    async fn list_offsets(
665        &self,
666        isolation_level: IsolationLevel,
667        offsets: &[(Topition, ListOffset)],
668    ) -> Result<Vec<(Topition, ListOffsetResponse)>> {
669        let offsets = Vec::from(offsets);
670
671        self.serve(
672            Context::default(),
673            Request::ListOffsets {
674                isolation_level,
675                offsets,
676            },
677        )
678        .await
679        .and_then(|response| {
680            if let Response::ListOffsets(inner) = response {
681                inner.map_err(Into::into)
682            } else {
683                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
684            }
685        })
686        .map_err(Into::into)
687    }
688
689    #[instrument(skip_all)]
690    async fn offset_commit(
691        &self,
692        group_id: &str,
693        retention_time_ms: Option<Duration>,
694        offsets: &[(Topition, OffsetCommitRequest)],
695    ) -> Result<Vec<(Topition, ErrorCode)>> {
696        let group_id = group_id.to_string();
697        let offsets = Vec::from(offsets);
698
699        self.serve(
700            Context::default(),
701            Request::OffsetCommit {
702                group_id,
703                retention_time_ms,
704                offsets,
705            },
706        )
707        .await
708        .and_then(|response| {
709            if let Response::OffsetCommit(inner) = response {
710                inner.map_err(Into::into)
711            } else {
712                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
713            }
714        })
715        .map_err(Into::into)
716    }
717
718    #[instrument(skip_all)]
719    async fn committed_offset_topitions(&self, group_id: &str) -> Result<BTreeMap<Topition, i64>> {
720        let group_id = group_id.to_string();
721
722        self.serve(
723            Context::default(),
724            Request::CommittedOffsetTopitions(group_id),
725        )
726        .await
727        .and_then(|response| {
728            if let Response::CommittedOffsetTopitions(inner) = response {
729                inner.map_err(Into::into)
730            } else {
731                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
732            }
733        })
734        .map_err(Into::into)
735    }
736
737    #[instrument(skip_all)]
738    async fn offset_fetch(
739        &self,
740        group_id: Option<&str>,
741        topics: &[Topition],
742        require_stable: Option<bool>,
743    ) -> Result<BTreeMap<Topition, i64>> {
744        let group_id = group_id.map(|s| s.to_string());
745        let topics = Vec::from(topics);
746
747        self.serve(
748            Context::default(),
749            Request::OffsetFetch {
750                group_id,
751                topics,
752                require_stable,
753            },
754        )
755        .await
756        .and_then(|response| {
757            if let Response::OffsetFetch(inner) = response {
758                inner.map_err(Into::into)
759            } else {
760                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
761            }
762        })
763        .map_err(Into::into)
764    }
765
766    #[instrument(skip_all)]
767    async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse> {
768        let topics = topics.map(Vec::from);
769
770        self.serve(Context::default(), Request::Metadata(topics))
771            .await
772            .and_then(|response| {
773                if let Response::Metadata(inner) = response {
774                    inner.map_err(Into::into)
775                } else {
776                    Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
777                }
778            })
779            .map_err(Into::into)
780    }
781
782    #[instrument(skip_all)]
783    async fn describe_config(
784        &self,
785        name: &str,
786        resource: ConfigResource,
787        keys: Option<&[String]>,
788    ) -> Result<DescribeConfigsResult> {
789        let name = name.to_string();
790        let keys = keys.map(Vec::from);
791
792        self.serve(
793            Context::default(),
794            Request::DescribeConfig {
795                name,
796                resource,
797                keys,
798            },
799        )
800        .await
801        .and_then(|response| {
802            if let Response::DescribeConfig(inner) = response {
803                inner.map_err(Into::into)
804            } else {
805                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
806            }
807        })
808        .map_err(Into::into)
809    }
810
811    #[instrument(skip_all)]
812    async fn describe_topic_partitions(
813        &self,
814        topics: Option<&[TopicId]>,
815        partition_limit: i32,
816        cursor: Option<Topition>,
817    ) -> Result<Vec<DescribeTopicPartitionsResponseTopic>> {
818        let topics = topics.map(Vec::from);
819
820        self.serve(
821            Context::default(),
822            Request::DescribeTopicPartitions {
823                topics,
824                partition_limit,
825                cursor,
826            },
827        )
828        .await
829        .and_then(|response| {
830            if let Response::DescribeTopicPartitions(inner) = response {
831                inner.map_err(Into::into)
832            } else {
833                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
834            }
835        })
836        .map_err(Into::into)
837    }
838
839    #[instrument(skip_all)]
840    async fn list_groups(&self, states_filter: Option<&[String]>) -> Result<Vec<ListedGroup>> {
841        let states_filter = states_filter.map(Vec::from);
842
843        self.serve(Context::default(), Request::ListGroups(states_filter))
844            .await
845            .and_then(|response| {
846                if let Response::ListGroups(inner) = response {
847                    inner.map_err(Into::into)
848                } else {
849                    Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
850                }
851            })
852            .map_err(Into::into)
853    }
854
855    #[instrument(skip_all)]
856    async fn delete_groups(
857        &self,
858        group_ids: Option<&[String]>,
859    ) -> Result<Vec<DeletableGroupResult>> {
860        let group_ids = group_ids.map(Vec::from);
861
862        self.serve(Context::default(), Request::DeleteGroups(group_ids))
863            .await
864            .and_then(|response| {
865                if let Response::DeleteGroups(inner) = response {
866                    inner.map_err(Into::into)
867                } else {
868                    Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
869                }
870            })
871            .map_err(Into::into)
872    }
873
874    #[instrument(skip_all)]
875    async fn describe_groups(
876        &self,
877        group_ids: Option<&[String]>,
878        include_authorized_operations: bool,
879    ) -> Result<Vec<NamedGroupDetail>> {
880        let group_ids = group_ids.map(Vec::from);
881
882        self.serve(
883            Context::default(),
884            Request::DescribeGroups {
885                group_ids,
886                include_authorized_operations,
887            },
888        )
889        .await
890        .and_then(|response| {
891            if let Response::DescribeGroups(inner) = response {
892                inner.map_err(Into::into)
893            } else {
894                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
895            }
896        })
897        .map_err(Into::into)
898    }
899
900    #[instrument(skip_all)]
901    async fn update_group(
902        &self,
903        group_id: &str,
904        detail: GroupDetail,
905        version: Option<Version>,
906    ) -> Result<Version, UpdateError<GroupDetail>> {
907        let group_id = group_id.to_string();
908
909        self.serve(
910            Context::default(),
911            Request::UpdateGroup {
912                group_id,
913                detail,
914                version,
915            },
916        )
917        .await
918        .and_then(|response| {
919            if let Response::UpdateGroup(inner) = response {
920                inner.map_err(Into::into)
921            } else {
922                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
923            }
924        })
925        .map_err(Into::into)
926    }
927
928    #[instrument(skip_all)]
929    async fn init_producer(
930        &self,
931        transaction_id: Option<&str>,
932        transaction_timeout_ms: i32,
933        producer_id: Option<i64>,
934        producer_epoch: Option<i16>,
935    ) -> Result<ProducerIdResponse> {
936        let transaction_id = transaction_id.map(|transaction_id| transaction_id.to_owned());
937
938        self.serve(
939            Context::default(),
940            Request::InitProducer {
941                transaction_id,
942                transaction_timeout_ms,
943                producer_id,
944                producer_epoch,
945            },
946        )
947        .await
948        .and_then(|response| {
949            if let Response::InitProducer(inner) = response {
950                inner.map_err(Into::into)
951            } else {
952                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
953            }
954        })
955        .map_err(Into::into)
956    }
957
958    #[instrument(skip_all)]
959    async fn txn_add_offsets(
960        &self,
961        transaction_id: &str,
962        producer_id: i64,
963        producer_epoch: i16,
964        group_id: &str,
965    ) -> Result<ErrorCode> {
966        let transaction_id = transaction_id.to_string();
967        let group_id = group_id.to_string();
968
969        self.serve(
970            Context::default(),
971            Request::TxnAddOffsets {
972                transaction_id,
973                producer_id,
974                producer_epoch,
975                group_id,
976            },
977        )
978        .await
979        .and_then(|response| {
980            if let Response::TxnAddOffsets(inner) = response {
981                inner.map_err(Into::into)
982            } else {
983                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
984            }
985        })
986        .map_err(Into::into)
987    }
988
989    #[instrument(skip_all)]
990    async fn txn_add_partitions(
991        &self,
992        partitions: TxnAddPartitionsRequest,
993    ) -> Result<TxnAddPartitionsResponse> {
994        self.serve(Context::default(), Request::TxnAddPartitions(partitions))
995            .await
996            .and_then(|response| {
997                if let Response::TxnAddPartitions(inner) = response {
998                    inner.map_err(Into::into)
999                } else {
1000                    Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1001                }
1002            })
1003            .map_err(Into::into)
1004    }
1005
1006    #[instrument(skip_all)]
1007    async fn txn_offset_commit(
1008        &self,
1009        offsets: TxnOffsetCommitRequest,
1010    ) -> Result<Vec<TxnOffsetCommitResponseTopic>> {
1011        self.serve(Context::default(), Request::TxnOffsetCommit(offsets))
1012            .await
1013            .and_then(|response| {
1014                if let Response::TxnOffsetCommit(inner) = response {
1015                    inner.map_err(Into::into)
1016                } else {
1017                    Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1018                }
1019            })
1020            .map_err(Into::into)
1021    }
1022
1023    #[instrument(skip_all)]
1024    async fn txn_end(
1025        &self,
1026        transaction_id: &str,
1027        producer_id: i64,
1028        producer_epoch: i16,
1029        committed: bool,
1030    ) -> Result<ErrorCode> {
1031        let transaction_id = transaction_id.to_string();
1032
1033        self.serve(
1034            Context::default(),
1035            Request::TxnEnd {
1036                transaction_id,
1037                producer_id,
1038                producer_epoch,
1039                committed,
1040            },
1041        )
1042        .await
1043        .and_then(|response| {
1044            if let Response::TxnEnd(inner) = response {
1045                inner.map_err(Into::into)
1046            } else {
1047                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1048            }
1049        })
1050        .map_err(Into::into)
1051    }
1052
1053    #[instrument(skip_all)]
1054    async fn maintain(&self, now: SystemTime) -> Result<()> {
1055        self.serve(Context::default(), Request::Maintain(now))
1056            .await
1057            .and_then(|response| {
1058                if let Response::Maintain(inner) = response {
1059                    inner.map_err(Into::into)
1060                } else {
1061                    Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1062                }
1063            })
1064            .map_err(Into::into)
1065    }
1066
1067    #[instrument(skip_all)]
1068    async fn cluster_id(&self) -> Result<String> {
1069        self.serve(Context::default(), Request::ClusterId)
1070            .await
1071            .and_then(|response| {
1072                if let Response::ClusterId(inner) = response {
1073                    inner.map_err(Into::into)
1074                } else {
1075                    Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1076                }
1077            })
1078            .map_err(Into::into)
1079    }
1080
1081    #[instrument(skip_all)]
1082    async fn node(&self) -> Result<i32> {
1083        self.serve(Context::default(), Request::Node)
1084            .await
1085            .and_then(|response| {
1086                if let Response::Node(inner) = response {
1087                    inner.map_err(Into::into)
1088                } else {
1089                    Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1090                }
1091            })
1092            .map_err(Into::into)
1093    }
1094
1095    #[instrument(skip_all)]
1096    async fn advertised_listener(&self) -> Result<Url> {
1097        self.serve(Context::default(), Request::AdvertisedListener)
1098            .await
1099            .and_then(|response| {
1100                if let Response::AdvertisedListener(inner) = response {
1101                    inner.map_err(Into::into)
1102                } else {
1103                    Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1104                }
1105            })
1106            .map_err(Into::into)
1107    }
1108
1109    #[instrument(skip_all)]
1110    async fn delete_user_scram_credential(
1111        &self,
1112        user: &str,
1113        mechanism: ScramMechanism,
1114    ) -> Result<()> {
1115        let user = user.to_string();
1116
1117        self.serve(
1118            Context::default(),
1119            Request::DeleteUserScramCredential { user, mechanism },
1120        )
1121        .await
1122        .and_then(|response| {
1123            if let Response::DeleteUserScramCredential(inner) = response {
1124                inner.map_err(Into::into)
1125            } else {
1126                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1127            }
1128        })
1129        .map_err(Into::into)
1130    }
1131
1132    #[instrument(skip_all)]
1133    async fn upsert_user_scram_credential(
1134        &self,
1135        user: &str,
1136        mechanism: ScramMechanism,
1137        credential: ScramCredential,
1138    ) -> Result<()> {
1139        let user = user.to_string();
1140
1141        self.serve(
1142            Context::default(),
1143            Request::UpsertUserScramCredential {
1144                user,
1145                mechanism,
1146                credential,
1147            },
1148        )
1149        .await
1150        .and_then(|response| {
1151            if let Response::UpsertUserScramCredential(inner) = response {
1152                inner.map_err(Into::into)
1153            } else {
1154                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1155            }
1156        })
1157        .map_err(Into::into)
1158    }
1159
1160    #[instrument(skip_all)]
1161    async fn user_scram_credential(
1162        &self,
1163        user: &str,
1164        mechanism: ScramMechanism,
1165    ) -> Result<Option<ScramCredential>> {
1166        let user = user.to_string();
1167
1168        self.serve(
1169            Context::default(),
1170            Request::UserScramCredential { user, mechanism },
1171        )
1172        .await
1173        .and_then(|response| {
1174            if let Response::UserScramCredential(inner) = response {
1175                inner.map_err(Into::into)
1176            } else {
1177                Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1178            }
1179        })
1180        .map_err(Into::into)
1181    }
1182
1183    #[instrument(skip_all)]
1184    async fn ping(&self) -> Result<()> {
1185        self.serve(Context::default(), Request::Ping)
1186            .await
1187            .and_then(|response| {
1188                if let Response::Ping(inner) = response {
1189                    inner.map_err(Into::into)
1190                } else {
1191                    Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1192                }
1193            })
1194            .map_err(Into::into)
1195    }
1196}
1197
1198#[derive(Clone, Debug, Default)]
1199pub struct ChannelRequestLayer {
1200    cancellation: CancellationToken,
1201}
1202
1203impl ChannelRequestLayer {
1204    pub fn new(cancellation: CancellationToken) -> Self {
1205        Self { cancellation }
1206    }
1207}
1208
1209impl<S> Layer<S> for ChannelRequestLayer {
1210    type Service = ChannelRequestService<S>;
1211
1212    fn layer(&self, inner: S) -> Self::Service {
1213        Self::Service {
1214            inner,
1215            cancellation: self.cancellation.clone(),
1216        }
1217    }
1218}
1219
1220#[derive(Clone, Debug, Default)]
1221pub struct ChannelRequestService<S> {
1222    inner: S,
1223    cancellation: CancellationToken,
1224}
1225
1226impl<S, State> Service<State, RequestReceiver> for ChannelRequestService<S>
1227where
1228    S: Service<State, Request, Response = Response, Error = Error>,
1229    State: Clone + Send + Sync + 'static,
1230{
1231    type Response = ();
1232    type Error = Error;
1233
1234    async fn serve(
1235        &self,
1236        ctx: Context<State>,
1237        mut req: RequestReceiver,
1238    ) -> Result<Self::Response, Self::Error> {
1239        loop {
1240            tokio::select! {
1241                Some((request, tx)) = req.recv() => {
1242                    self.inner
1243                    .serve(ctx.clone(), request)
1244                    .await
1245                    .and_then(|response| {
1246                        tx.send(response).map_err(|_unsent| Error::UnableToSend)
1247                    })?
1248                }
1249
1250                cancelled = self.cancellation.cancelled() => {
1251                    debug!(?cancelled);
1252                    break;
1253                }
1254            }
1255        }
1256
1257        Ok(())
1258    }
1259}
1260
1261#[derive(Copy, Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
1262pub struct RequestStorageService<G> {
1263    storage: G,
1264}
1265
1266impl<G> RequestStorageService<G>
1267where
1268    G: Storage,
1269{
1270    pub fn new(storage: G) -> Self {
1271        Self { storage }
1272    }
1273}
1274
1275impl<G, State> Service<State, Request> for RequestStorageService<G>
1276where
1277    G: Storage,
1278    State: Clone + Send + Sync + 'static,
1279{
1280    type Response = Response;
1281    type Error = Error;
1282
1283    async fn serve(
1284        &self,
1285        _ctx: Context<State>,
1286        req: Request,
1287    ) -> Result<Self::Response, Self::Error> {
1288        match req {
1289            Request::RegisterBroker(broker_registration) => Ok(Response::RegisterBroker(
1290                self.storage.register_broker(broker_registration).await,
1291            )),
1292            Request::IncrementalAlterResource(alter_configs_resource) => {
1293                Ok(Response::IncrementalAlterResponse(
1294                    self.storage
1295                        .incremental_alter_resource(alter_configs_resource)
1296                        .await,
1297                ))
1298            }
1299            Request::CreateTopic {
1300                topic,
1301                validate_only,
1302            } => Ok(Response::CreateTopic(
1303                self.storage.create_topic(topic, validate_only).await,
1304            )),
1305            Request::DeleteRecords(delete_records_topics) => Ok(Response::DeleteRecords(
1306                self.storage
1307                    .delete_records(&delete_records_topics[..])
1308                    .await,
1309            )),
1310            Request::DeleteTopic(topic_id) => Ok(Response::DeleteTopic(
1311                self.storage.delete_topic(&topic_id).await,
1312            )),
1313            Request::Brokers => Ok(Response::Brokers(self.storage.brokers().await)),
1314            Request::Produce {
1315                transaction_id,
1316                topition,
1317                batch,
1318            } => Ok(Response::Produce(
1319                self.storage
1320                    .produce(transaction_id.as_deref(), &topition, batch)
1321                    .await,
1322            )),
1323            Request::Fetch {
1324                topition,
1325                offset,
1326                min_bytes,
1327                max_bytes,
1328                isolation,
1329            } => Ok(Response::Fetch(
1330                self.storage
1331                    .fetch(&topition, offset, min_bytes, max_bytes, isolation)
1332                    .await,
1333            )),
1334            Request::OffsetStage(topition) => Ok(Response::OffsetStage(
1335                self.storage.offset_stage(&topition).await,
1336            )),
1337            Request::ListOffsets {
1338                isolation_level,
1339                offsets,
1340            } => Ok(Response::ListOffsets(
1341                self.storage
1342                    .list_offsets(isolation_level, &offsets[..])
1343                    .await,
1344            )),
1345            Request::OffsetCommit {
1346                group_id,
1347                retention_time_ms,
1348                offsets,
1349            } => Ok(Response::OffsetCommit(
1350                self.storage
1351                    .offset_commit(&group_id, retention_time_ms, &offsets[..])
1352                    .await,
1353            )),
1354            Request::CommittedOffsetTopitions(group_id) => Ok(Response::CommittedOffsetTopitions(
1355                self.storage.committed_offset_topitions(&group_id).await,
1356            )),
1357            Request::OffsetFetch {
1358                group_id,
1359                topics,
1360                require_stable,
1361            } => Ok(Response::OffsetFetch(
1362                self.storage
1363                    .offset_fetch(group_id.as_deref(), &topics[..], require_stable)
1364                    .await,
1365            )),
1366            Request::Metadata(topic_ids) => Ok(Response::Metadata(
1367                self.storage.metadata(topic_ids.as_deref()).await,
1368            )),
1369            Request::DescribeConfig {
1370                name,
1371                resource,
1372                keys,
1373            } => Ok(Response::DescribeConfig(
1374                self.storage
1375                    .describe_config(&name, resource, keys.as_deref())
1376                    .await,
1377            )),
1378            Request::DescribeTopicPartitions {
1379                topics,
1380                partition_limit,
1381                cursor,
1382            } => Ok(Response::DescribeTopicPartitions(
1383                self.storage
1384                    .describe_topic_partitions(topics.as_deref(), partition_limit, cursor)
1385                    .await,
1386            )),
1387            Request::ListGroups(items) => Ok(Response::ListGroups(
1388                self.storage.list_groups(items.as_deref()).await,
1389            )),
1390            Request::DeleteGroups(items) => Ok(Response::DeleteGroups(
1391                self.storage.delete_groups(items.as_deref()).await,
1392            )),
1393            Request::DescribeGroups {
1394                group_ids,
1395                include_authorized_operations,
1396            } => Ok(Response::DescribeGroups(
1397                self.storage
1398                    .describe_groups(group_ids.as_deref(), include_authorized_operations)
1399                    .await,
1400            )),
1401            Request::UpdateGroup {
1402                group_id,
1403                detail,
1404                version,
1405            } => Ok(Response::UpdateGroup(
1406                self.storage.update_group(&group_id, detail, version).await,
1407            )),
1408            Request::InitProducer {
1409                transaction_id,
1410                transaction_timeout_ms,
1411                producer_id,
1412                producer_epoch,
1413            } => Ok(Response::InitProducer(
1414                self.storage
1415                    .init_producer(
1416                        transaction_id.as_deref(),
1417                        transaction_timeout_ms,
1418                        producer_id,
1419                        producer_epoch,
1420                    )
1421                    .await,
1422            )),
1423            Request::TxnAddOffsets {
1424                transaction_id,
1425                producer_id,
1426                producer_epoch,
1427                group_id,
1428            } => Ok(Response::TxnAddOffsets(
1429                self.storage
1430                    .txn_add_offsets(&transaction_id, producer_id, producer_epoch, &group_id)
1431                    .await,
1432            )),
1433            Request::TxnAddPartitions(txn_add_partitions_request) => {
1434                Ok(Response::TxnAddPartitions(
1435                    self.storage
1436                        .txn_add_partitions(txn_add_partitions_request)
1437                        .await,
1438                ))
1439            }
1440            Request::TxnOffsetCommit(txn_offset_commit_request) => Ok(Response::TxnOffsetCommit(
1441                self.storage
1442                    .txn_offset_commit(txn_offset_commit_request)
1443                    .await,
1444            )),
1445            Request::TxnEnd {
1446                transaction_id,
1447                producer_id,
1448                producer_epoch,
1449                committed,
1450            } => Ok(Response::TxnEnd(
1451                self.storage
1452                    .txn_end(&transaction_id, producer_id, producer_epoch, committed)
1453                    .await,
1454            )),
1455            Request::Maintain(now) => Ok(Response::Maintain(self.storage.maintain(now).await)),
1456            Request::ClusterId => Ok(Response::ClusterId(self.storage.cluster_id().await)),
1457            Request::Node => Ok(Response::Node(self.storage.node().await)),
1458            Request::AdvertisedListener => Ok(Response::AdvertisedListener(
1459                self.storage.advertised_listener().await,
1460            )),
1461            Request::DeleteUserScramCredential { user, mechanism } => {
1462                Ok(Response::DeleteUserScramCredential(
1463                    self.storage
1464                        .delete_user_scram_credential(&user[..], mechanism)
1465                        .await,
1466                ))
1467            }
1468            Request::UpsertUserScramCredential {
1469                user,
1470                mechanism,
1471                credential,
1472            } => Ok(Response::UpsertUserScramCredential(
1473                self.storage
1474                    .upsert_user_scram_credential(&user[..], mechanism, credential)
1475                    .await,
1476            )),
1477            Request::UserScramCredential { user, mechanism } => Ok(Response::UserScramCredential(
1478                self.storage
1479                    .user_scram_credential(&user[..], mechanism)
1480                    .await,
1481            )),
1482            Request::Ping => Ok(Response::Ping(self.storage.ping().await)),
1483        }
1484    }
1485}