Skip to main content

tansu_storage/service/
describe_topic_partitions.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use rama::{Context, Service};
16use tansu_sans_io::{ApiKey, DescribeTopicPartitionsRequest, DescribeTopicPartitionsResponse};
17use tracing::instrument;
18
19use crate::{Error, Result, Storage, TopicId};
20
21/// A [`Service`] using [`Storage`] as [`Context`] taking [`DescribeTopicPartitionsRequest`] returning [`DescribeTopicPartitionsResponse`].
22/// ```
23/// use rama::{Context, Layer as _, Service, layer::MapStateLayer};
24/// use tansu_sans_io::{
25///     DescribeTopicPartitionsRequest, ErrorCode,
26///     describe_topic_partitions_request::TopicRequest,
27/// };
28/// use tansu_storage::{DescribeTopicPartitionsService, Error, StorageContainer};
29/// use url::Url;
30///
31/// # #[tokio::main]
32/// # async fn main() -> Result<(), Error> {
33/// let storage = StorageContainer::builder()
34///     .cluster_id("tansu")
35///     .node_id(111)
36///     .advertised_listener(Url::parse("tcp://localhost:9092")?)
37///     .storage(Url::parse("memory://tansu/")?)
38///     .build()
39///     .await?;
40///
41/// let service = MapStateLayer::new(|_| storage).into_layer(DescribeTopicPartitionsService);
42///
43/// let topic = "abcba";
44///
45/// let response = service
46///     .serve(
47///         Context::default(),
48///         DescribeTopicPartitionsRequest::default()
49///             .topics(Some([TopicRequest::default().name(topic.into())].into())),
50///     )
51///     .await?;
52///
53/// let topics = response.topics.unwrap_or_default();
54/// assert_eq!(1, topics.len());
55/// assert_eq!(
56///     ErrorCode::UnknownTopicOrPartition,
57///     ErrorCode::try_from(topics[0].error_code)?
58/// );
59/// assert_eq!(Some(topic), topics[0].name.as_deref());
60/// # Ok(())
61/// # }
62/// ```
63#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
64pub struct DescribeTopicPartitionsService;
65
66impl ApiKey for DescribeTopicPartitionsService {
67    const KEY: i16 = DescribeTopicPartitionsRequest::KEY;
68}
69
70impl<G> Service<G, DescribeTopicPartitionsRequest> for DescribeTopicPartitionsService
71where
72    G: Storage,
73{
74    type Response = DescribeTopicPartitionsResponse;
75    type Error = Error;
76
77    #[instrument(skip(ctx, req))]
78    async fn serve(
79        &self,
80        ctx: Context<G>,
81        req: DescribeTopicPartitionsRequest,
82    ) -> Result<Self::Response, Self::Error> {
83        ctx.state()
84            .describe_topic_partitions(
85                req.topics
86                    .as_ref()
87                    .map(|topics| topics.iter().map(TopicId::from).collect::<Vec<_>>())
88                    .as_deref(),
89                req.response_partition_limit,
90                req.cursor.map(Into::into),
91            )
92            .await
93            .map(|topics| {
94                DescribeTopicPartitionsResponse::default()
95                    .throttle_time_ms(0)
96                    .topics(Some(topics))
97                    .next_cursor(None)
98            })
99    }
100}