tansu_storage/service/describe_topic_partitions.rs
1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use rama::{Context, Service};
16use tansu_sans_io::{ApiKey, DescribeTopicPartitionsRequest, DescribeTopicPartitionsResponse};
17use tracing::instrument;
18
19use crate::{Error, Result, Storage, TopicId};
20
21/// A [`Service`] using [`Storage`] as [`Context`] taking [`DescribeTopicPartitionsRequest`] returning [`DescribeTopicPartitionsResponse`].
22/// ```
23/// use rama::{Context, Layer as _, Service, layer::MapStateLayer};
24/// use tansu_sans_io::{
25/// DescribeTopicPartitionsRequest, ErrorCode,
26/// describe_topic_partitions_request::TopicRequest,
27/// };
28/// use tansu_storage::{DescribeTopicPartitionsService, Error, StorageContainer};
29/// use url::Url;
30///
31/// # #[tokio::main]
32/// # async fn main() -> Result<(), Error> {
33/// let storage = StorageContainer::builder()
34/// .cluster_id("tansu")
35/// .node_id(111)
36/// .advertised_listener(Url::parse("tcp://localhost:9092")?)
37/// .storage(Url::parse("memory://tansu/")?)
38/// .build()
39/// .await?;
40///
41/// let service = MapStateLayer::new(|_| storage).into_layer(DescribeTopicPartitionsService);
42///
43/// let topic = "abcba";
44///
45/// let response = service
46/// .serve(
47/// Context::default(),
48/// DescribeTopicPartitionsRequest::default()
49/// .topics(Some([TopicRequest::default().name(topic.into())].into())),
50/// )
51/// .await?;
52///
53/// let topics = response.topics.unwrap_or_default();
54/// assert_eq!(1, topics.len());
55/// assert_eq!(
56/// ErrorCode::UnknownTopicOrPartition,
57/// ErrorCode::try_from(topics[0].error_code)?
58/// );
59/// assert_eq!(Some(topic), topics[0].name.as_deref());
60/// # Ok(())
61/// # }
62/// ```
63#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
64pub struct DescribeTopicPartitionsService;
65
66impl ApiKey for DescribeTopicPartitionsService {
67 const KEY: i16 = DescribeTopicPartitionsRequest::KEY;
68}
69
70impl<G> Service<G, DescribeTopicPartitionsRequest> for DescribeTopicPartitionsService
71where
72 G: Storage,
73{
74 type Response = DescribeTopicPartitionsResponse;
75 type Error = Error;
76
77 #[instrument(skip(ctx, req))]
78 async fn serve(
79 &self,
80 ctx: Context<G>,
81 req: DescribeTopicPartitionsRequest,
82 ) -> Result<Self::Response, Self::Error> {
83 ctx.state()
84 .describe_topic_partitions(
85 req.topics
86 .as_ref()
87 .map(|topics| topics.iter().map(TopicId::from).collect::<Vec<_>>())
88 .as_deref(),
89 req.response_partition_limit,
90 req.cursor.map(Into::into),
91 )
92 .await
93 .map(|topics| {
94 DescribeTopicPartitionsResponse::default()
95 .throttle_time_ms(0)
96 .topics(Some(topics))
97 .next_cursor(None)
98 })
99 }
100}