tansu_storage/service/describe_groups.rs
1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use rama::{Context, Service};
16use tansu_sans_io::{
17 ApiKey, DescribeGroupsRequest, DescribeGroupsResponse, describe_groups_response::DescribedGroup,
18};
19use tracing::instrument;
20
21use crate::{Error, Result, Storage};
22
23/// A [`Service`] using [`Storage`] as [`Context`] taking [`DescribeGroupsRequest`] returning [`DescribeGroupsResponse`].
24/// ```
25/// use rama::{Context, Layer as _, Service, layer::MapStateLayer};
26/// use tansu_sans_io::{DescribeGroupsRequest, ErrorCode};
27/// use tansu_storage::{DescribeGroupsService, Error, StorageContainer};
28/// use url::Url;
29///
30/// # #[tokio::main]
31/// # async fn main() -> Result<(), Error> {
32/// let storage = StorageContainer::builder()
33/// .cluster_id("tansu")
34/// .node_id(111)
35/// .advertised_listener(Url::parse("tcp://localhost:9092")?)
36/// .storage(Url::parse("memory://tansu/")?)
37/// .build()
38/// .await?;
39///
40/// let service = MapStateLayer::new(|_| storage).into_layer(DescribeGroupsService);
41///
42/// let group_id = "abcba";
43///
44/// let response = service
45/// .serve(
46/// Context::default(),
47/// DescribeGroupsRequest::default()
48/// .groups(Some([group_id.into()].into()))
49/// .include_authorized_operations(Some(false)),
50/// )
51/// .await?;
52///
53/// let groups = response.groups.unwrap_or_default();
54/// assert_eq!(1, groups.len());
55/// assert_eq!(ErrorCode::None, ErrorCode::try_from(groups[0].error_code)?);
56/// assert_eq!(group_id, groups[0].group_id.as_str());
57/// assert_eq!("Empty", groups[0].group_state.as_str());
58/// # Ok(())
59/// # }
60/// ```
61#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
62pub struct DescribeGroupsService;
63
64impl ApiKey for DescribeGroupsService {
65 const KEY: i16 = DescribeGroupsRequest::KEY;
66}
67
68impl<G> Service<G, DescribeGroupsRequest> for DescribeGroupsService
69where
70 G: Storage,
71{
72 type Response = DescribeGroupsResponse;
73 type Error = Error;
74
75 #[instrument(skip(ctx, req))]
76 async fn serve(
77 &self,
78 ctx: Context<G>,
79 req: DescribeGroupsRequest,
80 ) -> Result<Self::Response, Self::Error> {
81 ctx.state()
82 .describe_groups(
83 req.groups.as_deref(),
84 req.include_authorized_operations.unwrap_or(false),
85 )
86 .await
87 .map(|described| {
88 described
89 .iter()
90 .map(DescribedGroup::from)
91 .collect::<Vec<_>>()
92 })
93 .map(Some)
94 .map(|groups| {
95 DescribeGroupsResponse::default()
96 .throttle_time_ms(Some(0))
97 .groups(groups)
98 })
99 }
100}