tansu_storage/service/consumer_group_describe.rs
1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use rama::{Context, Service};
16use tansu_sans_io::{
17 ApiKey, ConsumerGroupDescribeRequest,
18 consumer_group_describe_response::{ConsumerGroupDescribeResponse, DescribedGroup},
19};
20use tracing::{debug, instrument};
21
22use crate::{Error, Result, Storage};
23
24/// A [`Service`] using [`Storage`] as [`Context`] taking [`ConsumerGroupDescribeRequest`] returning [`ConsumerGroupDescribeResponse`].
25/// ```
26/// use rama::{Context, Layer, Service as _, layer::MapStateLayer};
27/// use tansu_sans_io::{ConsumerGroupDescribeRequest, ErrorCode};
28/// use tansu_storage::{ConsumerGroupDescribeService, Error, StorageContainer};
29/// use url::Url;
30///
31/// # #[tokio::main]
32/// # async fn main() -> Result<(), Error> {
33/// let storage = StorageContainer::builder()
34/// .cluster_id("tansu")
35/// .node_id(111)
36/// .advertised_listener(Url::parse("tcp://localhost:9092")?)
37/// .storage(Url::parse("memory://tansu/")?)
38/// .build()
39/// .await?;
40///
41/// let service = MapStateLayer::new(|_| storage).into_layer(ConsumerGroupDescribeService);
42///
43/// let group_id = "abc";
44///
45/// let response = service
46/// .serve(
47/// Context::default(),
48/// ConsumerGroupDescribeRequest::default()
49/// .group_ids(Some([group_id.into()].into()))
50/// .include_authorized_operations(false),
51/// )
52/// .await?;
53///
54/// let groups = response.groups.unwrap_or_default();
55/// assert_eq!(1, groups.len());
56/// assert_eq!(ErrorCode::None, ErrorCode::try_from(groups[0].error_code)?);
57/// assert_eq!(group_id, groups[0].group_id.as_str());
58/// assert_eq!("Empty", groups[0].group_state.as_str());
59/// # Ok(())
60/// # }
61/// ```
62#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
63pub struct ConsumerGroupDescribeService;
64
65impl ApiKey for ConsumerGroupDescribeService {
66 const KEY: i16 = ConsumerGroupDescribeRequest::KEY;
67}
68
69impl<G> Service<G, ConsumerGroupDescribeRequest> for ConsumerGroupDescribeService
70where
71 G: Storage,
72{
73 type Response = ConsumerGroupDescribeResponse;
74 type Error = Error;
75
76 #[instrument(skip(ctx, req))]
77 async fn serve(
78 &self,
79 ctx: Context<G>,
80 req: ConsumerGroupDescribeRequest,
81 ) -> Result<Self::Response, Self::Error> {
82 ctx.state()
83 .describe_groups(req.group_ids.as_deref(), req.include_authorized_operations)
84 .await
85 .inspect(|described| debug!(?described))
86 .map(|described| {
87 described
88 .iter()
89 .map(DescribedGroup::from)
90 .collect::<Vec<_>>()
91 })
92 .map(Some)
93 .map(|groups| {
94 ConsumerGroupDescribeResponse::default()
95 .throttle_time_ms(0)
96 .groups(groups)
97 })
98 }
99}