1mod alter_user_scram_credentials;
16mod consumer_group_describe;
17mod create_acls;
18mod create_topics;
19mod delete_groups;
20mod delete_records;
21mod delete_topics;
22mod describe_acls;
23mod describe_cluster;
24mod describe_configs;
25mod describe_groups;
26mod describe_topic_partitions;
27mod describe_user_scram_credentials;
28mod fetch;
29mod find_coordinator;
30mod get_telemetry_subscriptions;
31mod incremental_alter_configs;
32mod init_producer_id;
33mod list_groups;
34mod list_offsets;
35mod list_partition_reassignments;
36mod metadata;
37mod produce;
38mod txn;
39
40use std::{
41 collections::BTreeMap,
42 fmt::{self, Debug, Display, Formatter},
43 sync::LazyLock,
44 time::{Duration, SystemTime},
45};
46
47pub use alter_user_scram_credentials::AlterUserScramCredentialsService;
48use async_trait::async_trait;
49pub use consumer_group_describe::ConsumerGroupDescribeService;
50pub use create_acls::CreateAclsService;
51pub use create_topics::CreateTopicsService;
52pub use delete_groups::DeleteGroupsService;
53pub use delete_records::DeleteRecordsService;
54pub use delete_topics::DeleteTopicsService;
55pub use describe_acls::DescribeAclsService;
56pub use describe_cluster::DescribeClusterService;
57pub use describe_configs::DescribeConfigsService;
58pub use describe_groups::DescribeGroupsService;
59pub use describe_topic_partitions::DescribeTopicPartitionsService;
60pub use describe_user_scram_credentials::DescribeUserScramCredentialsService;
61pub use fetch::FetchService;
62pub use find_coordinator::FindCoordinatorService;
63pub use get_telemetry_subscriptions::GetTelemetrySubscriptionsService;
64pub use incremental_alter_configs::IncrementalAlterConfigsService;
65pub use init_producer_id::InitProducerIdService;
66pub use list_groups::ListGroupsService;
67pub use list_offsets::ListOffsetsService;
68pub use list_partition_reassignments::ListPartitionReassignmentsService;
69pub use metadata::MetadataService;
70use opentelemetry::{
71 KeyValue,
72 metrics::{Counter, Gauge, Histogram},
73};
74pub use produce::ProduceService;
75use rama::{Context, Layer, Service};
76use tansu_sans_io::{
77 ConfigResource, ErrorCode, IsolationLevel, ListOffset, ScramMechanism,
78 create_topics_request::CreatableTopic, delete_groups_response::DeletableGroupResult,
79 delete_records_request::DeleteRecordsTopic, delete_records_response::DeleteRecordsTopicResult,
80 describe_cluster_response::DescribeClusterBroker,
81 describe_configs_response::DescribeConfigsResult,
82 describe_topic_partitions_response::DescribeTopicPartitionsResponseTopic,
83 incremental_alter_configs_request::AlterConfigsResource,
84 incremental_alter_configs_response::AlterConfigsResourceResponse,
85 list_groups_response::ListedGroup, record::deflated,
86 txn_offset_commit_response::TxnOffsetCommitResponseTopic,
87};
88use tokio::sync::{
89 mpsc::{self, error::SendError},
90 oneshot,
91};
92use tokio_util::sync::CancellationToken;
93use tracing::{debug, error, instrument};
94pub use txn::add_offsets::AddOffsetsService as TxnAddOffsetsService;
95pub use txn::add_partitions::AddPartitionService as TxnAddPartitionService;
96pub use txn::offset_commit::OffsetCommitService as TxnOffsetCommitService;
97use url::Url;
98use uuid::Uuid;
99
100use crate::{
101 BrokerRegistrationRequest, Error, GroupDetail, ListOffsetResponse, METER, MetadataResponse,
102 NamedGroupDetail, OffsetCommitRequest, OffsetStage, ProducerIdResponse, Result,
103 ScramCredential, Storage, TopicId, Topition, TxnAddPartitionsRequest, TxnAddPartitionsResponse,
104 TxnOffsetCommitRequest, UpdateError, Version,
105};
106
107#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
108pub enum Request {
109 RegisterBroker(BrokerRegistrationRequest),
110 IncrementalAlterResource(AlterConfigsResource),
111 CreateTopic {
112 topic: CreatableTopic,
113 validate_only: bool,
114 },
115 DeleteRecords(Vec<DeleteRecordsTopic>),
116 DeleteTopic(TopicId),
117 Brokers,
118 Produce {
119 transaction_id: Option<String>,
120 topition: Topition,
121 batch: deflated::Batch,
122 },
123 Fetch {
124 topition: Topition,
125 offset: i64,
126 min_bytes: u32,
127 max_bytes: u32,
128 isolation: IsolationLevel,
129 },
130 OffsetStage(Topition),
131 ListOffsets {
132 isolation_level: IsolationLevel,
133 offsets: Vec<(Topition, ListOffset)>,
134 },
135 OffsetCommit {
136 group_id: String,
137 retention_time_ms: Option<Duration>,
138 offsets: Vec<(Topition, OffsetCommitRequest)>,
139 },
140 CommittedOffsetTopitions(String),
141 OffsetFetch {
142 group_id: Option<String>,
143 topics: Vec<Topition>,
144 require_stable: Option<bool>,
145 },
146 Metadata(Option<Vec<TopicId>>),
147 DescribeConfig {
148 name: String,
149 resource: ConfigResource,
150 keys: Option<Vec<String>>,
151 },
152 DescribeTopicPartitions {
153 topics: Option<Vec<TopicId>>,
154 partition_limit: i32,
155 cursor: Option<Topition>,
156 },
157 ListGroups(Option<Vec<String>>),
158 DeleteGroups(Option<Vec<String>>),
159 DescribeGroups {
160 group_ids: Option<Vec<String>>,
161 include_authorized_operations: bool,
162 },
163 UpdateGroup {
164 group_id: String,
165 detail: GroupDetail,
166 version: Option<Version>,
167 },
168 InitProducer {
169 transaction_id: Option<String>,
170 transaction_timeout_ms: i32,
171 producer_id: Option<i64>,
172 producer_epoch: Option<i16>,
173 },
174 TxnAddOffsets {
175 transaction_id: String,
176 producer_id: i64,
177 producer_epoch: i16,
178 group_id: String,
179 },
180 TxnAddPartitions(TxnAddPartitionsRequest),
181 TxnOffsetCommit(TxnOffsetCommitRequest),
182 TxnEnd {
183 transaction_id: String,
184 producer_id: i64,
185 producer_epoch: i16,
186 committed: bool,
187 },
188 Maintain(SystemTime),
189 ClusterId,
190 Node,
191 AdvertisedListener,
192 DeleteUserScramCredential {
193 user: String,
194 mechanism: ScramMechanism,
195 },
196 UpsertUserScramCredential {
197 user: String,
198 mechanism: ScramMechanism,
199 credential: ScramCredential,
200 },
201 UserScramCredential {
202 user: String,
203 mechanism: ScramMechanism,
204 },
205 Ping,
206}
207
208impl Display for Request {
209 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
210 match self {
211 Self::AdvertisedListener => f.write_str("AdvertisedListener"),
212 Self::Brokers => f.write_str("Brokers"),
213 Self::ClusterId => f.write_str("ClusterId"),
214 Self::CommittedOffsetTopitions(_) => f.write_str("CommittedOffsetTopitions"),
215 Self::CreateTopic { .. } => f.write_str("CreateTopic"),
216 Self::DeleteGroups(_) => f.write_str("DeleteGroups"),
217 Self::DeleteRecords(_) => f.write_str("DeleteRecords"),
218 Self::DeleteTopic(_) => f.write_str("DeleteTopic"),
219 Self::DescribeConfig { .. } => f.write_str("DescribeConfig"),
220 Self::DescribeGroups { .. } => f.write_str("DescribeGroups"),
221 Self::DescribeTopicPartitions { .. } => f.write_str("DescribeTopicPartitions"),
222 Self::Fetch { .. } => f.write_str("Fetch"),
223 Self::IncrementalAlterResource(_) => f.write_str("IncrementalAlterResource"),
224 Self::InitProducer { .. } => f.write_str("InitProducer"),
225 Self::ListGroups(_) => f.write_str("ListGroups"),
226 Self::ListOffsets { .. } => f.write_str("ListOffsets"),
227 Self::Maintain(_) => f.write_str("Maintain"),
228 Self::Metadata(_) => f.write_str("Metadata"),
229 Self::Node => f.write_str("Node"),
230 Self::OffsetCommit { .. } => f.write_str("OffsetCommit"),
231 Self::OffsetFetch { .. } => f.write_str("OffsetFetch"),
232 Self::OffsetStage(_) => f.write_str("OffsetStage"),
233 Self::Produce { .. } => f.write_str("Produce"),
234 Self::RegisterBroker(_) => f.write_str("RegisterBroker"),
235 Self::TxnAddOffsets { .. } => f.write_str("TxnAddOffsets"),
236 Self::TxnAddPartitions(_) => f.write_str("TxnAddPartitions"),
237 Self::TxnEnd { .. } => f.write_str("TxnEnd"),
238 Self::TxnOffsetCommit(_) => f.write_str("TxnOffsetCommit"),
239 Self::UpdateGroup { .. } => f.write_str("UpdateGroup"),
240 Self::DeleteUserScramCredential { .. } => f.write_str("DeleteUserScramCredential"),
241 Self::UpsertUserScramCredential { .. } => f.write_str("UpsertUserScramCredential"),
242 Self::UserScramCredential { .. } => f.write_str("UserScramCredential"),
243 Self::Ping => f.write_str("Ping"),
244 }
245 }
246}
247
248#[derive(Clone, Debug)]
249pub enum Response {
250 AdvertisedListener(Result<Url>),
251 Brokers(Result<Vec<DescribeClusterBroker>>),
252 ClusterId(Result<String>),
253 CommittedOffsetTopitions(Result<BTreeMap<Topition, i64>>),
254 CreateTopic(Result<Uuid>),
255 DeleteGroups(Result<Vec<DeletableGroupResult>>),
256 DeleteRecords(Result<Vec<DeleteRecordsTopicResult>>),
257 DeleteTopic(Result<ErrorCode>),
258 DeleteUserScramCredential(Result<()>),
259 DescribeConfig(Result<DescribeConfigsResult>),
260 DescribeGroups(Result<Vec<NamedGroupDetail>>),
261 DescribeTopicPartitions(Result<Vec<DescribeTopicPartitionsResponseTopic>>),
262 Fetch(Result<Vec<deflated::Batch>>),
263 IncrementalAlterResponse(Result<AlterConfigsResourceResponse>),
264 InitProducer(Result<ProducerIdResponse>),
265 ListGroups(Result<Vec<ListedGroup>>),
266 ListOffsets(Result<Vec<(Topition, ListOffsetResponse)>>),
267 Maintain(Result<()>),
268 Metadata(Result<MetadataResponse>),
269 Node(Result<i32>),
270 OffsetCommit(Result<Vec<(Topition, ErrorCode)>>),
271 OffsetFetch(Result<BTreeMap<Topition, i64>>),
272 OffsetStage(Result<OffsetStage>),
273 Ping(Result<()>),
274 Produce(Result<i64>),
275 RegisterBroker(Result<()>),
276 TxnAddOffsets(Result<ErrorCode>),
277 TxnAddPartitions(Result<TxnAddPartitionsResponse>),
278 TxnEnd(Result<ErrorCode>),
279 TxnOffsetCommit(Result<Vec<TxnOffsetCommitResponseTopic>>),
280 UpdateGroup(Result<Version, UpdateError<GroupDetail>>),
281 UpsertUserScramCredential(Result<()>),
282 UserScramCredential(Result<Option<ScramCredential>>),
283}
284
285pub type RequestSender = mpsc::Sender<(Request, oneshot::Sender<Response>)>;
286pub type RequestReceiver = mpsc::Receiver<(Request, oneshot::Sender<Response>)>;
287
288pub fn bounded_channel(buffer: usize) -> (RequestSender, RequestReceiver) {
289 mpsc::channel::<(Request, oneshot::Sender<Response>)>(buffer)
290}
291
292#[derive(Clone, Debug, thiserror::Error)]
293pub enum ServiceError {
294 Storage(Error),
295 UpdateGroupDetail(UpdateError<GroupDetail>),
296}
297
298impl Display for ServiceError {
299 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
300 write!(f, "{self:?}")
301 }
302}
303
304impl From<SendError<()>> for ServiceError {
305 fn from(_value: SendError<()>) -> Self {
306 Self::Storage(Error::UnableToSend)
307 }
308}
309
310impl From<Error> for ServiceError {
311 fn from(value: Error) -> Self {
312 Self::Storage(value)
313 }
314}
315
316impl From<UpdateError<GroupDetail>> for ServiceError {
317 fn from(value: UpdateError<GroupDetail>) -> Self {
318 Self::UpdateGroupDetail(value)
319 }
320}
321
322impl From<ServiceError> for Error {
323 fn from(value: ServiceError) -> Self {
324 if let ServiceError::Storage(error) = value {
325 error
326 } else {
327 unreachable!()
328 }
329 }
330}
331
332impl From<ServiceError> for UpdateError<GroupDetail> {
333 fn from(value: ServiceError) -> Self {
334 if let ServiceError::UpdateGroupDetail(error) = value {
335 error
336 } else {
337 unreachable!()
338 }
339 }
340}
341
342#[derive(Copy, Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
343pub struct RequestLayer;
344
345impl<S> Layer<S> for RequestLayer {
346 type Service = RequestService<S>;
347
348 fn layer(&self, inner: S) -> Self::Service {
349 Self::Service { inner }
350 }
351}
352
353#[derive(Copy, Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
354pub struct RequestService<S> {
355 inner: S,
356}
357
358impl<State, S> Service<State, Request> for RequestService<S>
359where
360 S: Service<State, Request>,
361 State: Send + Sync + 'static,
362{
363 type Response = S::Response;
364 type Error = S::Error;
365
366 async fn serve(
367 &self,
368 ctx: Context<State>,
369 req: Request,
370 ) -> Result<Self::Response, Self::Error> {
371 debug!(?req);
372 self.inner.serve(ctx, req).await
373 }
374}
375
376#[derive(Clone, Debug)]
378pub struct RequestChannelService {
379 tx: RequestSender,
380}
381
382impl RequestChannelService {
383 pub fn new(tx: RequestSender) -> Self {
384 Self { tx }
385 }
386
387 fn elapsed_millis(&self, start: SystemTime) -> u64 {
388 start
389 .elapsed()
390 .map_or(0, |duration| duration.as_millis() as u64)
391 }
392}
393
394static STORAGE_CHANNEL_CAPACITY: LazyLock<Gauge<u64>> = LazyLock::new(|| {
395 METER
396 .u64_gauge("tansu_storage_channel_capacity")
397 .with_description("Storage channel capacity")
398 .build()
399});
400
401impl<State> Service<State, Request> for RequestChannelService
402where
403 State: Send + Sync + 'static,
404{
405 type Response = Response;
406 type Error = ServiceError;
407
408 #[instrument(skip_all)]
409 async fn serve(
410 &self,
411 ctx: Context<State>,
412 req: Request,
413 ) -> Result<Self::Response, Self::Error> {
414 let _ = ctx;
415 let (resp_tx, resp_rx) = oneshot::channel();
416
417 let start = SystemTime::now();
418
419 let operation = req.to_string();
420 let attributes = [KeyValue::new("operation", operation.clone())];
421
422 let capacity = self.tx.capacity();
423 STORAGE_CHANNEL_CAPACITY.record(capacity as u64, &attributes);
424 debug!(operation, capacity);
425
426 self.tx
427 .reserve()
428 .await
429 .map(|permit| permit.send((req, resp_tx)))
430 .inspect(|_| {
431 let permit_elapsed = self.elapsed_millis(start);
432 STORAGE_CHANNEL_PERMIT_DURATION.record(permit_elapsed, &attributes);
433 debug!(operation, permit_elapsed);
434 })
435 .inspect_err(|err| {
436 error!(operation, ?err);
437 STORAGE_CHANNEL_ERROR.add(1, &attributes);
438 })?;
439
440 resp_rx
441 .await
442 .map_err(|_| Error::OneshotRecv.into())
443 .inspect(|_| {
444 let elapsed_millis = self.elapsed_millis(start);
445 STORAGE_CHANNEL_REQUEST_DURATION.record(elapsed_millis, &attributes);
446 debug!(operation, elapsed_millis);
447 })
448 .inspect_err(|err| {
449 error!(operation, ?err);
450 STORAGE_CHANNEL_ERROR.add(1, &attributes);
451 })
452 }
453}
454
455static STORAGE_CHANNEL_REQUEST_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
456 METER
457 .u64_histogram("tansu_storage_channel_request_duration")
458 .with_unit("ms")
459 .with_description("Storage channel request latency in milliseconds")
460 .build()
461});
462
463static STORAGE_CHANNEL_PERMIT_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
464 METER
465 .u64_histogram("tansu_storage_channel_permit_duration")
466 .with_unit("ms")
467 .with_description("Storage channel permit latency in milliseconds")
468 .build()
469});
470
471static STORAGE_CHANNEL_ERROR: LazyLock<Counter<u64>> = LazyLock::new(|| {
472 METER
473 .u64_counter("tansu_storage_channel_error")
474 .with_description("Storage channel error count")
475 .build()
476});
477
478#[async_trait]
479impl Storage for RequestChannelService {
480 #[instrument(skip_all)]
481 async fn register_broker(&self, broker_registration: BrokerRegistrationRequest) -> Result<()> {
482 self.serve(
483 Context::default(),
484 Request::RegisterBroker(broker_registration),
485 )
486 .await
487 .and_then(|response| {
488 if let Response::RegisterBroker(inner) = response {
489 inner.map_err(Into::into)
490 } else {
491 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
492 }
493 })
494 .map_err(Into::into)
495 }
496
497 #[instrument(skip_all)]
498 async fn incremental_alter_resource(
499 &self,
500 resource: AlterConfigsResource,
501 ) -> Result<AlterConfigsResourceResponse> {
502 self.serve(
503 Context::default(),
504 Request::IncrementalAlterResource(resource),
505 )
506 .await
507 .and_then(|response| {
508 if let Response::IncrementalAlterResponse(inner) = response {
509 inner.map_err(Into::into)
510 } else {
511 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
512 }
513 })
514 .map_err(Into::into)
515 }
516
517 #[instrument(skip_all)]
518 async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result<Uuid> {
519 self.serve(
520 Context::default(),
521 Request::CreateTopic {
522 topic,
523 validate_only,
524 },
525 )
526 .await
527 .and_then(|response| {
528 if let Response::CreateTopic(inner) = response {
529 inner.map_err(Into::into)
530 } else {
531 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
532 }
533 })
534 .map_err(Into::into)
535 }
536
537 #[instrument(skip_all)]
538 async fn delete_records(
539 &self,
540 topics: &[DeleteRecordsTopic],
541 ) -> Result<Vec<DeleteRecordsTopicResult>> {
542 self.serve(
543 Context::default(),
544 Request::DeleteRecords(Vec::from(topics)),
545 )
546 .await
547 .and_then(|response| {
548 if let Response::DeleteRecords(inner) = response {
549 inner.map_err(Into::into)
550 } else {
551 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
552 }
553 })
554 .map_err(Into::into)
555 }
556
557 #[instrument(skip_all)]
558 async fn delete_topic(&self, topic: &TopicId) -> Result<ErrorCode> {
559 self.serve(Context::default(), Request::DeleteTopic(topic.to_owned()))
560 .await
561 .and_then(|response| {
562 if let Response::DeleteTopic(inner) = response {
563 inner.map_err(Into::into)
564 } else {
565 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
566 }
567 })
568 .map_err(Into::into)
569 }
570
571 #[instrument(skip_all)]
572 async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>> {
573 self.serve(Context::default(), Request::Brokers)
574 .await
575 .and_then(|response| {
576 if let Response::Brokers(inner) = response {
577 inner.map_err(Into::into)
578 } else {
579 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
580 }
581 })
582 .map_err(Into::into)
583 }
584
585 #[instrument(skip_all)]
586 async fn produce(
587 &self,
588 transaction_id: Option<&str>,
589 topition: &Topition,
590 batch: deflated::Batch,
591 ) -> Result<i64> {
592 let transaction_id = transaction_id.map(|s| s.to_string());
593 let topition = topition.to_owned();
594
595 self.serve(
596 Context::default(),
597 Request::Produce {
598 transaction_id,
599 topition,
600 batch,
601 },
602 )
603 .await
604 .and_then(|response| {
605 if let Response::Produce(inner) = response {
606 inner.map_err(Into::into)
607 } else {
608 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
609 }
610 })
611 .map_err(Into::into)
612 }
613
614 #[instrument(skip_all)]
615 async fn fetch(
616 &self,
617 topition: &'_ Topition,
618 offset: i64,
619 min_bytes: u32,
620 max_bytes: u32,
621 isolation: IsolationLevel,
622 ) -> Result<Vec<deflated::Batch>> {
623 let topition = topition.to_owned();
624
625 self.serve(
626 Context::default(),
627 Request::Fetch {
628 topition,
629 offset,
630 min_bytes,
631 max_bytes,
632 isolation,
633 },
634 )
635 .await
636 .and_then(|response| {
637 if let Response::Fetch(inner) = response {
638 inner.map_err(Into::into)
639 } else {
640 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
641 }
642 })
643 .map_err(Into::into)
644 }
645
646 #[instrument(skip_all)]
647 async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage> {
648 self.serve(
649 Context::default(),
650 Request::OffsetStage(topition.to_owned()),
651 )
652 .await
653 .and_then(|response| {
654 if let Response::OffsetStage(inner) = response {
655 inner.map_err(Into::into)
656 } else {
657 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
658 }
659 })
660 .map_err(Into::into)
661 }
662
663 #[instrument(skip_all)]
664 async fn list_offsets(
665 &self,
666 isolation_level: IsolationLevel,
667 offsets: &[(Topition, ListOffset)],
668 ) -> Result<Vec<(Topition, ListOffsetResponse)>> {
669 let offsets = Vec::from(offsets);
670
671 self.serve(
672 Context::default(),
673 Request::ListOffsets {
674 isolation_level,
675 offsets,
676 },
677 )
678 .await
679 .and_then(|response| {
680 if let Response::ListOffsets(inner) = response {
681 inner.map_err(Into::into)
682 } else {
683 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
684 }
685 })
686 .map_err(Into::into)
687 }
688
689 #[instrument(skip_all)]
690 async fn offset_commit(
691 &self,
692 group_id: &str,
693 retention_time_ms: Option<Duration>,
694 offsets: &[(Topition, OffsetCommitRequest)],
695 ) -> Result<Vec<(Topition, ErrorCode)>> {
696 let group_id = group_id.to_string();
697 let offsets = Vec::from(offsets);
698
699 self.serve(
700 Context::default(),
701 Request::OffsetCommit {
702 group_id,
703 retention_time_ms,
704 offsets,
705 },
706 )
707 .await
708 .and_then(|response| {
709 if let Response::OffsetCommit(inner) = response {
710 inner.map_err(Into::into)
711 } else {
712 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
713 }
714 })
715 .map_err(Into::into)
716 }
717
718 #[instrument(skip_all)]
719 async fn committed_offset_topitions(&self, group_id: &str) -> Result<BTreeMap<Topition, i64>> {
720 let group_id = group_id.to_string();
721
722 self.serve(
723 Context::default(),
724 Request::CommittedOffsetTopitions(group_id),
725 )
726 .await
727 .and_then(|response| {
728 if let Response::CommittedOffsetTopitions(inner) = response {
729 inner.map_err(Into::into)
730 } else {
731 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
732 }
733 })
734 .map_err(Into::into)
735 }
736
737 #[instrument(skip_all)]
738 async fn offset_fetch(
739 &self,
740 group_id: Option<&str>,
741 topics: &[Topition],
742 require_stable: Option<bool>,
743 ) -> Result<BTreeMap<Topition, i64>> {
744 let group_id = group_id.map(|s| s.to_string());
745 let topics = Vec::from(topics);
746
747 self.serve(
748 Context::default(),
749 Request::OffsetFetch {
750 group_id,
751 topics,
752 require_stable,
753 },
754 )
755 .await
756 .and_then(|response| {
757 if let Response::OffsetFetch(inner) = response {
758 inner.map_err(Into::into)
759 } else {
760 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
761 }
762 })
763 .map_err(Into::into)
764 }
765
766 #[instrument(skip_all)]
767 async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse> {
768 let topics = topics.map(Vec::from);
769
770 self.serve(Context::default(), Request::Metadata(topics))
771 .await
772 .and_then(|response| {
773 if let Response::Metadata(inner) = response {
774 inner.map_err(Into::into)
775 } else {
776 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
777 }
778 })
779 .map_err(Into::into)
780 }
781
782 #[instrument(skip_all)]
783 async fn describe_config(
784 &self,
785 name: &str,
786 resource: ConfigResource,
787 keys: Option<&[String]>,
788 ) -> Result<DescribeConfigsResult> {
789 let name = name.to_string();
790 let keys = keys.map(Vec::from);
791
792 self.serve(
793 Context::default(),
794 Request::DescribeConfig {
795 name,
796 resource,
797 keys,
798 },
799 )
800 .await
801 .and_then(|response| {
802 if let Response::DescribeConfig(inner) = response {
803 inner.map_err(Into::into)
804 } else {
805 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
806 }
807 })
808 .map_err(Into::into)
809 }
810
811 #[instrument(skip_all)]
812 async fn describe_topic_partitions(
813 &self,
814 topics: Option<&[TopicId]>,
815 partition_limit: i32,
816 cursor: Option<Topition>,
817 ) -> Result<Vec<DescribeTopicPartitionsResponseTopic>> {
818 let topics = topics.map(Vec::from);
819
820 self.serve(
821 Context::default(),
822 Request::DescribeTopicPartitions {
823 topics,
824 partition_limit,
825 cursor,
826 },
827 )
828 .await
829 .and_then(|response| {
830 if let Response::DescribeTopicPartitions(inner) = response {
831 inner.map_err(Into::into)
832 } else {
833 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
834 }
835 })
836 .map_err(Into::into)
837 }
838
839 #[instrument(skip_all)]
840 async fn list_groups(&self, states_filter: Option<&[String]>) -> Result<Vec<ListedGroup>> {
841 let states_filter = states_filter.map(Vec::from);
842
843 self.serve(Context::default(), Request::ListGroups(states_filter))
844 .await
845 .and_then(|response| {
846 if let Response::ListGroups(inner) = response {
847 inner.map_err(Into::into)
848 } else {
849 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
850 }
851 })
852 .map_err(Into::into)
853 }
854
855 #[instrument(skip_all)]
856 async fn delete_groups(
857 &self,
858 group_ids: Option<&[String]>,
859 ) -> Result<Vec<DeletableGroupResult>> {
860 let group_ids = group_ids.map(Vec::from);
861
862 self.serve(Context::default(), Request::DeleteGroups(group_ids))
863 .await
864 .and_then(|response| {
865 if let Response::DeleteGroups(inner) = response {
866 inner.map_err(Into::into)
867 } else {
868 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
869 }
870 })
871 .map_err(Into::into)
872 }
873
874 #[instrument(skip_all)]
875 async fn describe_groups(
876 &self,
877 group_ids: Option<&[String]>,
878 include_authorized_operations: bool,
879 ) -> Result<Vec<NamedGroupDetail>> {
880 let group_ids = group_ids.map(Vec::from);
881
882 self.serve(
883 Context::default(),
884 Request::DescribeGroups {
885 group_ids,
886 include_authorized_operations,
887 },
888 )
889 .await
890 .and_then(|response| {
891 if let Response::DescribeGroups(inner) = response {
892 inner.map_err(Into::into)
893 } else {
894 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
895 }
896 })
897 .map_err(Into::into)
898 }
899
900 #[instrument(skip_all)]
901 async fn update_group(
902 &self,
903 group_id: &str,
904 detail: GroupDetail,
905 version: Option<Version>,
906 ) -> Result<Version, UpdateError<GroupDetail>> {
907 let group_id = group_id.to_string();
908
909 self.serve(
910 Context::default(),
911 Request::UpdateGroup {
912 group_id,
913 detail,
914 version,
915 },
916 )
917 .await
918 .and_then(|response| {
919 if let Response::UpdateGroup(inner) = response {
920 inner.map_err(Into::into)
921 } else {
922 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
923 }
924 })
925 .map_err(Into::into)
926 }
927
928 #[instrument(skip_all)]
929 async fn init_producer(
930 &self,
931 transaction_id: Option<&str>,
932 transaction_timeout_ms: i32,
933 producer_id: Option<i64>,
934 producer_epoch: Option<i16>,
935 ) -> Result<ProducerIdResponse> {
936 let transaction_id = transaction_id.map(|transaction_id| transaction_id.to_owned());
937
938 self.serve(
939 Context::default(),
940 Request::InitProducer {
941 transaction_id,
942 transaction_timeout_ms,
943 producer_id,
944 producer_epoch,
945 },
946 )
947 .await
948 .and_then(|response| {
949 if let Response::InitProducer(inner) = response {
950 inner.map_err(Into::into)
951 } else {
952 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
953 }
954 })
955 .map_err(Into::into)
956 }
957
958 #[instrument(skip_all)]
959 async fn txn_add_offsets(
960 &self,
961 transaction_id: &str,
962 producer_id: i64,
963 producer_epoch: i16,
964 group_id: &str,
965 ) -> Result<ErrorCode> {
966 let transaction_id = transaction_id.to_string();
967 let group_id = group_id.to_string();
968
969 self.serve(
970 Context::default(),
971 Request::TxnAddOffsets {
972 transaction_id,
973 producer_id,
974 producer_epoch,
975 group_id,
976 },
977 )
978 .await
979 .and_then(|response| {
980 if let Response::TxnAddOffsets(inner) = response {
981 inner.map_err(Into::into)
982 } else {
983 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
984 }
985 })
986 .map_err(Into::into)
987 }
988
989 #[instrument(skip_all)]
990 async fn txn_add_partitions(
991 &self,
992 partitions: TxnAddPartitionsRequest,
993 ) -> Result<TxnAddPartitionsResponse> {
994 self.serve(Context::default(), Request::TxnAddPartitions(partitions))
995 .await
996 .and_then(|response| {
997 if let Response::TxnAddPartitions(inner) = response {
998 inner.map_err(Into::into)
999 } else {
1000 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1001 }
1002 })
1003 .map_err(Into::into)
1004 }
1005
1006 #[instrument(skip_all)]
1007 async fn txn_offset_commit(
1008 &self,
1009 offsets: TxnOffsetCommitRequest,
1010 ) -> Result<Vec<TxnOffsetCommitResponseTopic>> {
1011 self.serve(Context::default(), Request::TxnOffsetCommit(offsets))
1012 .await
1013 .and_then(|response| {
1014 if let Response::TxnOffsetCommit(inner) = response {
1015 inner.map_err(Into::into)
1016 } else {
1017 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1018 }
1019 })
1020 .map_err(Into::into)
1021 }
1022
1023 #[instrument(skip_all)]
1024 async fn txn_end(
1025 &self,
1026 transaction_id: &str,
1027 producer_id: i64,
1028 producer_epoch: i16,
1029 committed: bool,
1030 ) -> Result<ErrorCode> {
1031 let transaction_id = transaction_id.to_string();
1032
1033 self.serve(
1034 Context::default(),
1035 Request::TxnEnd {
1036 transaction_id,
1037 producer_id,
1038 producer_epoch,
1039 committed,
1040 },
1041 )
1042 .await
1043 .and_then(|response| {
1044 if let Response::TxnEnd(inner) = response {
1045 inner.map_err(Into::into)
1046 } else {
1047 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1048 }
1049 })
1050 .map_err(Into::into)
1051 }
1052
1053 #[instrument(skip_all)]
1054 async fn maintain(&self, now: SystemTime) -> Result<()> {
1055 self.serve(Context::default(), Request::Maintain(now))
1056 .await
1057 .and_then(|response| {
1058 if let Response::Maintain(inner) = response {
1059 inner.map_err(Into::into)
1060 } else {
1061 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1062 }
1063 })
1064 .map_err(Into::into)
1065 }
1066
1067 #[instrument(skip_all)]
1068 async fn cluster_id(&self) -> Result<String> {
1069 self.serve(Context::default(), Request::ClusterId)
1070 .await
1071 .and_then(|response| {
1072 if let Response::ClusterId(inner) = response {
1073 inner.map_err(Into::into)
1074 } else {
1075 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1076 }
1077 })
1078 .map_err(Into::into)
1079 }
1080
1081 #[instrument(skip_all)]
1082 async fn node(&self) -> Result<i32> {
1083 self.serve(Context::default(), Request::Node)
1084 .await
1085 .and_then(|response| {
1086 if let Response::Node(inner) = response {
1087 inner.map_err(Into::into)
1088 } else {
1089 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1090 }
1091 })
1092 .map_err(Into::into)
1093 }
1094
1095 #[instrument(skip_all)]
1096 async fn advertised_listener(&self) -> Result<Url> {
1097 self.serve(Context::default(), Request::AdvertisedListener)
1098 .await
1099 .and_then(|response| {
1100 if let Response::AdvertisedListener(inner) = response {
1101 inner.map_err(Into::into)
1102 } else {
1103 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1104 }
1105 })
1106 .map_err(Into::into)
1107 }
1108
1109 #[instrument(skip_all)]
1110 async fn delete_user_scram_credential(
1111 &self,
1112 user: &str,
1113 mechanism: ScramMechanism,
1114 ) -> Result<()> {
1115 let user = user.to_string();
1116
1117 self.serve(
1118 Context::default(),
1119 Request::DeleteUserScramCredential { user, mechanism },
1120 )
1121 .await
1122 .and_then(|response| {
1123 if let Response::DeleteUserScramCredential(inner) = response {
1124 inner.map_err(Into::into)
1125 } else {
1126 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1127 }
1128 })
1129 .map_err(Into::into)
1130 }
1131
1132 #[instrument(skip_all)]
1133 async fn upsert_user_scram_credential(
1134 &self,
1135 user: &str,
1136 mechanism: ScramMechanism,
1137 credential: ScramCredential,
1138 ) -> Result<()> {
1139 let user = user.to_string();
1140
1141 self.serve(
1142 Context::default(),
1143 Request::UpsertUserScramCredential {
1144 user,
1145 mechanism,
1146 credential,
1147 },
1148 )
1149 .await
1150 .and_then(|response| {
1151 if let Response::UpsertUserScramCredential(inner) = response {
1152 inner.map_err(Into::into)
1153 } else {
1154 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1155 }
1156 })
1157 .map_err(Into::into)
1158 }
1159
1160 #[instrument(skip_all)]
1161 async fn user_scram_credential(
1162 &self,
1163 user: &str,
1164 mechanism: ScramMechanism,
1165 ) -> Result<Option<ScramCredential>> {
1166 let user = user.to_string();
1167
1168 self.serve(
1169 Context::default(),
1170 Request::UserScramCredential { user, mechanism },
1171 )
1172 .await
1173 .and_then(|response| {
1174 if let Response::UserScramCredential(inner) = response {
1175 inner.map_err(Into::into)
1176 } else {
1177 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1178 }
1179 })
1180 .map_err(Into::into)
1181 }
1182
1183 #[instrument(skip_all)]
1184 async fn ping(&self) -> Result<()> {
1185 self.serve(Context::default(), Request::Ping)
1186 .await
1187 .and_then(|response| {
1188 if let Response::Ping(inner) = response {
1189 inner.map_err(Into::into)
1190 } else {
1191 Err(Error::UnexpectedServiceResponse(Box::new(response)).into())
1192 }
1193 })
1194 .map_err(Into::into)
1195 }
1196}
1197
1198#[derive(Clone, Debug, Default)]
1199pub struct ChannelRequestLayer {
1200 cancellation: CancellationToken,
1201}
1202
1203impl ChannelRequestLayer {
1204 pub fn new(cancellation: CancellationToken) -> Self {
1205 Self { cancellation }
1206 }
1207}
1208
1209impl<S> Layer<S> for ChannelRequestLayer {
1210 type Service = ChannelRequestService<S>;
1211
1212 fn layer(&self, inner: S) -> Self::Service {
1213 Self::Service {
1214 inner,
1215 cancellation: self.cancellation.clone(),
1216 }
1217 }
1218}
1219
1220#[derive(Clone, Debug, Default)]
1221pub struct ChannelRequestService<S> {
1222 inner: S,
1223 cancellation: CancellationToken,
1224}
1225
1226impl<S, State> Service<State, RequestReceiver> for ChannelRequestService<S>
1227where
1228 S: Service<State, Request, Response = Response, Error = Error>,
1229 State: Clone + Send + Sync + 'static,
1230{
1231 type Response = ();
1232 type Error = Error;
1233
1234 async fn serve(
1235 &self,
1236 ctx: Context<State>,
1237 mut req: RequestReceiver,
1238 ) -> Result<Self::Response, Self::Error> {
1239 loop {
1240 tokio::select! {
1241 Some((request, tx)) = req.recv() => {
1242 self.inner
1243 .serve(ctx.clone(), request)
1244 .await
1245 .and_then(|response| {
1246 tx.send(response).map_err(|_unsent| Error::UnableToSend)
1247 })?
1248 }
1249
1250 cancelled = self.cancellation.cancelled() => {
1251 debug!(?cancelled);
1252 break;
1253 }
1254 }
1255 }
1256
1257 Ok(())
1258 }
1259}
1260
1261#[derive(Copy, Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
1262pub struct RequestStorageService<G> {
1263 storage: G,
1264}
1265
1266impl<G> RequestStorageService<G>
1267where
1268 G: Storage,
1269{
1270 pub fn new(storage: G) -> Self {
1271 Self { storage }
1272 }
1273}
1274
1275impl<G, State> Service<State, Request> for RequestStorageService<G>
1276where
1277 G: Storage,
1278 State: Clone + Send + Sync + 'static,
1279{
1280 type Response = Response;
1281 type Error = Error;
1282
1283 async fn serve(
1284 &self,
1285 _ctx: Context<State>,
1286 req: Request,
1287 ) -> Result<Self::Response, Self::Error> {
1288 match req {
1289 Request::RegisterBroker(broker_registration) => Ok(Response::RegisterBroker(
1290 self.storage.register_broker(broker_registration).await,
1291 )),
1292 Request::IncrementalAlterResource(alter_configs_resource) => {
1293 Ok(Response::IncrementalAlterResponse(
1294 self.storage
1295 .incremental_alter_resource(alter_configs_resource)
1296 .await,
1297 ))
1298 }
1299 Request::CreateTopic {
1300 topic,
1301 validate_only,
1302 } => Ok(Response::CreateTopic(
1303 self.storage.create_topic(topic, validate_only).await,
1304 )),
1305 Request::DeleteRecords(delete_records_topics) => Ok(Response::DeleteRecords(
1306 self.storage
1307 .delete_records(&delete_records_topics[..])
1308 .await,
1309 )),
1310 Request::DeleteTopic(topic_id) => Ok(Response::DeleteTopic(
1311 self.storage.delete_topic(&topic_id).await,
1312 )),
1313 Request::Brokers => Ok(Response::Brokers(self.storage.brokers().await)),
1314 Request::Produce {
1315 transaction_id,
1316 topition,
1317 batch,
1318 } => Ok(Response::Produce(
1319 self.storage
1320 .produce(transaction_id.as_deref(), &topition, batch)
1321 .await,
1322 )),
1323 Request::Fetch {
1324 topition,
1325 offset,
1326 min_bytes,
1327 max_bytes,
1328 isolation,
1329 } => Ok(Response::Fetch(
1330 self.storage
1331 .fetch(&topition, offset, min_bytes, max_bytes, isolation)
1332 .await,
1333 )),
1334 Request::OffsetStage(topition) => Ok(Response::OffsetStage(
1335 self.storage.offset_stage(&topition).await,
1336 )),
1337 Request::ListOffsets {
1338 isolation_level,
1339 offsets,
1340 } => Ok(Response::ListOffsets(
1341 self.storage
1342 .list_offsets(isolation_level, &offsets[..])
1343 .await,
1344 )),
1345 Request::OffsetCommit {
1346 group_id,
1347 retention_time_ms,
1348 offsets,
1349 } => Ok(Response::OffsetCommit(
1350 self.storage
1351 .offset_commit(&group_id, retention_time_ms, &offsets[..])
1352 .await,
1353 )),
1354 Request::CommittedOffsetTopitions(group_id) => Ok(Response::CommittedOffsetTopitions(
1355 self.storage.committed_offset_topitions(&group_id).await,
1356 )),
1357 Request::OffsetFetch {
1358 group_id,
1359 topics,
1360 require_stable,
1361 } => Ok(Response::OffsetFetch(
1362 self.storage
1363 .offset_fetch(group_id.as_deref(), &topics[..], require_stable)
1364 .await,
1365 )),
1366 Request::Metadata(topic_ids) => Ok(Response::Metadata(
1367 self.storage.metadata(topic_ids.as_deref()).await,
1368 )),
1369 Request::DescribeConfig {
1370 name,
1371 resource,
1372 keys,
1373 } => Ok(Response::DescribeConfig(
1374 self.storage
1375 .describe_config(&name, resource, keys.as_deref())
1376 .await,
1377 )),
1378 Request::DescribeTopicPartitions {
1379 topics,
1380 partition_limit,
1381 cursor,
1382 } => Ok(Response::DescribeTopicPartitions(
1383 self.storage
1384 .describe_topic_partitions(topics.as_deref(), partition_limit, cursor)
1385 .await,
1386 )),
1387 Request::ListGroups(items) => Ok(Response::ListGroups(
1388 self.storage.list_groups(items.as_deref()).await,
1389 )),
1390 Request::DeleteGroups(items) => Ok(Response::DeleteGroups(
1391 self.storage.delete_groups(items.as_deref()).await,
1392 )),
1393 Request::DescribeGroups {
1394 group_ids,
1395 include_authorized_operations,
1396 } => Ok(Response::DescribeGroups(
1397 self.storage
1398 .describe_groups(group_ids.as_deref(), include_authorized_operations)
1399 .await,
1400 )),
1401 Request::UpdateGroup {
1402 group_id,
1403 detail,
1404 version,
1405 } => Ok(Response::UpdateGroup(
1406 self.storage.update_group(&group_id, detail, version).await,
1407 )),
1408 Request::InitProducer {
1409 transaction_id,
1410 transaction_timeout_ms,
1411 producer_id,
1412 producer_epoch,
1413 } => Ok(Response::InitProducer(
1414 self.storage
1415 .init_producer(
1416 transaction_id.as_deref(),
1417 transaction_timeout_ms,
1418 producer_id,
1419 producer_epoch,
1420 )
1421 .await,
1422 )),
1423 Request::TxnAddOffsets {
1424 transaction_id,
1425 producer_id,
1426 producer_epoch,
1427 group_id,
1428 } => Ok(Response::TxnAddOffsets(
1429 self.storage
1430 .txn_add_offsets(&transaction_id, producer_id, producer_epoch, &group_id)
1431 .await,
1432 )),
1433 Request::TxnAddPartitions(txn_add_partitions_request) => {
1434 Ok(Response::TxnAddPartitions(
1435 self.storage
1436 .txn_add_partitions(txn_add_partitions_request)
1437 .await,
1438 ))
1439 }
1440 Request::TxnOffsetCommit(txn_offset_commit_request) => Ok(Response::TxnOffsetCommit(
1441 self.storage
1442 .txn_offset_commit(txn_offset_commit_request)
1443 .await,
1444 )),
1445 Request::TxnEnd {
1446 transaction_id,
1447 producer_id,
1448 producer_epoch,
1449 committed,
1450 } => Ok(Response::TxnEnd(
1451 self.storage
1452 .txn_end(&transaction_id, producer_id, producer_epoch, committed)
1453 .await,
1454 )),
1455 Request::Maintain(now) => Ok(Response::Maintain(self.storage.maintain(now).await)),
1456 Request::ClusterId => Ok(Response::ClusterId(self.storage.cluster_id().await)),
1457 Request::Node => Ok(Response::Node(self.storage.node().await)),
1458 Request::AdvertisedListener => Ok(Response::AdvertisedListener(
1459 self.storage.advertised_listener().await,
1460 )),
1461 Request::DeleteUserScramCredential { user, mechanism } => {
1462 Ok(Response::DeleteUserScramCredential(
1463 self.storage
1464 .delete_user_scram_credential(&user[..], mechanism)
1465 .await,
1466 ))
1467 }
1468 Request::UpsertUserScramCredential {
1469 user,
1470 mechanism,
1471 credential,
1472 } => Ok(Response::UpsertUserScramCredential(
1473 self.storage
1474 .upsert_user_scram_credential(&user[..], mechanism, credential)
1475 .await,
1476 )),
1477 Request::UserScramCredential { user, mechanism } => Ok(Response::UserScramCredential(
1478 self.storage
1479 .user_scram_credential(&user[..], mechanism)
1480 .await,
1481 )),
1482 Request::Ping => Ok(Response::Ping(self.storage.ping().await)),
1483 }
1484 }
1485}