Skip to main content

tansu_storage/service/
describe_groups.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use rama::{Context, Service};
16use tansu_sans_io::{
17    ApiKey, DescribeGroupsRequest, DescribeGroupsResponse, describe_groups_response::DescribedGroup,
18};
19use tracing::instrument;
20
21use crate::{Error, Result, Storage};
22
23/// A [`Service`] using [`Storage`] as [`Context`] taking [`DescribeGroupsRequest`] returning [`DescribeGroupsResponse`].
24/// ```
25/// use rama::{Context, Layer as _, Service, layer::MapStateLayer};
26/// use tansu_sans_io::{DescribeGroupsRequest, ErrorCode};
27/// use tansu_storage::{DescribeGroupsService, Error, StorageContainer};
28/// use url::Url;
29///
30/// # #[tokio::main]
31/// # async fn main() -> Result<(), Error> {
32/// let storage = StorageContainer::builder()
33///     .cluster_id("tansu")
34///     .node_id(111)
35///     .advertised_listener(Url::parse("tcp://localhost:9092")?)
36///     .storage(Url::parse("memory://tansu/")?)
37///     .build()
38///     .await?;
39///
40/// let service = MapStateLayer::new(|_| storage).into_layer(DescribeGroupsService);
41///
42/// let group_id = "abcba";
43///
44/// let response = service
45///     .serve(
46///         Context::default(),
47///         DescribeGroupsRequest::default()
48///             .groups(Some([group_id.into()].into()))
49///             .include_authorized_operations(Some(false)),
50///     )
51///     .await?;
52///
53/// let groups = response.groups.unwrap_or_default();
54/// assert_eq!(1, groups.len());
55/// assert_eq!(ErrorCode::None, ErrorCode::try_from(groups[0].error_code)?);
56/// assert_eq!(group_id, groups[0].group_id.as_str());
57/// assert_eq!("Empty", groups[0].group_state.as_str());
58/// # Ok(())
59/// # }
60/// ```
61#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
62pub struct DescribeGroupsService;
63
64impl ApiKey for DescribeGroupsService {
65    const KEY: i16 = DescribeGroupsRequest::KEY;
66}
67
68impl<G> Service<G, DescribeGroupsRequest> for DescribeGroupsService
69where
70    G: Storage,
71{
72    type Response = DescribeGroupsResponse;
73    type Error = Error;
74
75    #[instrument(skip(ctx, req))]
76    async fn serve(
77        &self,
78        ctx: Context<G>,
79        req: DescribeGroupsRequest,
80    ) -> Result<Self::Response, Self::Error> {
81        ctx.state()
82            .describe_groups(
83                req.groups.as_deref(),
84                req.include_authorized_operations.unwrap_or(false),
85            )
86            .await
87            .map(|described| {
88                described
89                    .iter()
90                    .map(DescribedGroup::from)
91                    .collect::<Vec<_>>()
92            })
93            .map(Some)
94            .map(|groups| {
95                DescribeGroupsResponse::default()
96                    .throttle_time_ms(Some(0))
97                    .groups(groups)
98            })
99    }
100}