Skip to main content

tansu_storage/service/
create_topics.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use rama::{Context, Service};
16use tansu_sans_io::{
17    ApiKey, CreateTopicsRequest, CreateTopicsResponse, ErrorCode, NULL_TOPIC_ID,
18    create_topics_response::CreatableTopicResult,
19};
20use tracing::{debug, instrument};
21
22use crate::{Error, Result, Storage};
23
24/// A [`Service`] using [`Storage`] as [`Context`] taking [`CreateTopicsRequest`] returning [`CreateTopicsResponse`].
25/// ```
26/// use rama::{Context, Layer, Service as _, layer::MapStateLayer};
27/// use tansu_sans_io::{NULL_TOPIC_ID, CreateTopicsRequest,
28///     create_topics_request::CreatableTopic, ErrorCode};
29/// use tansu_storage::{CreateTopicsService, Error, StorageContainer};
30/// use url::Url;
31///
32/// # #[tokio::main]
33/// # async fn main() -> Result<(), Error> {
34/// let storage = StorageContainer::builder()
35///     .cluster_id("tansu")
36///     .node_id(111)
37///     .advertised_listener(Url::parse("tcp://localhost:9092")?)
38///     .storage(Url::parse("memory://tansu/")?)
39///     .build()
40///     .await?;
41///
42/// let service = MapStateLayer::new(|_| storage).into_layer(CreateTopicsService);
43///
44/// let name = "abcba";
45///
46/// let response = service
47///     .serve(
48///         Context::default(),
49///         CreateTopicsRequest::default()
50///             .topics(Some(vec![
51///                 CreatableTopic::default()
52///                     .name(name.into())
53///                     .num_partitions(1)
54///                     .replication_factor(3)
55///                     .assignments(Some([].into()))
56///                     .configs(Some([].into())),
57///             ]))
58///             .validate_only(Some(false)),
59///     )
60///     .await?;
61///
62/// let topics = response.topics.unwrap_or_default();
63///
64/// assert_eq!(1, topics.len());
65/// assert_eq!(name, topics[0].name.as_str());
66/// assert_ne!(Some(NULL_TOPIC_ID), topics[0].topic_id);
67/// assert_eq!(Some(1), topics[0].num_partitions);
68/// assert_eq!(Some(3), topics[0].replication_factor);
69/// assert_eq!(ErrorCode::None, ErrorCode::try_from(topics[0].error_code)?);
70/// # Ok(())
71/// # }
72/// ```
73#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
74pub struct CreateTopicsService;
75
76impl ApiKey for CreateTopicsService {
77    const KEY: i16 = CreateTopicsRequest::KEY;
78}
79
80impl<G> Service<G, CreateTopicsRequest> for CreateTopicsService
81where
82    G: Storage,
83{
84    type Response = CreateTopicsResponse;
85    type Error = Error;
86
87    #[instrument(skip(ctx, req))]
88    async fn serve(
89        &self,
90        ctx: Context<G>,
91        req: CreateTopicsRequest,
92    ) -> Result<Self::Response, Self::Error> {
93        let mut topics = vec![];
94
95        for mut topic in req.topics.unwrap_or_default() {
96            let name = topic.name.clone();
97
98            let num_partitions = Some(match topic.num_partitions {
99                -1 => {
100                    topic.num_partitions = 3;
101                    topic.num_partitions
102                }
103                otherwise => otherwise,
104            });
105
106            let replication_factor = Some(match topic.replication_factor {
107                -1 => {
108                    topic.replication_factor = 1;
109                    topic.replication_factor
110                }
111                otherwise => otherwise,
112            });
113
114            match ctx
115                .state()
116                .create_topic(topic, req.validate_only.unwrap_or_default())
117                .await
118            {
119                Ok(topic_id) => {
120                    debug!(?topic_id);
121
122                    topics.push(
123                        CreatableTopicResult::default()
124                            .name(name)
125                            .topic_id(Some(topic_id.into_bytes()))
126                            .error_code(ErrorCode::None.into())
127                            .error_message(None)
128                            .topic_config_error_code(Some(ErrorCode::None.into()))
129                            .num_partitions(num_partitions)
130                            .replication_factor(replication_factor)
131                            .configs(Some([].into())),
132                    );
133                }
134
135                Err(Error::Api(error_code)) => topics.push(
136                    CreatableTopicResult::default()
137                        .name(name)
138                        .topic_id(Some(NULL_TOPIC_ID))
139                        .error_code(error_code.into())
140                        .error_message(Some(error_code.to_string()))
141                        .topic_config_error_code(None)
142                        .num_partitions(num_partitions)
143                        .replication_factor(replication_factor)
144                        .configs(Some([].into())),
145                ),
146
147                Err(error) => {
148                    debug!(?error);
149
150                    topics.push(
151                        CreatableTopicResult::default()
152                            .name(name)
153                            .topic_id(Some(NULL_TOPIC_ID))
154                            .error_code(ErrorCode::UnknownServerError.into())
155                            .error_message(None)
156                            .topic_config_error_code(None)
157                            .num_partitions(None)
158                            .replication_factor(None)
159                            .configs(Some([].into())),
160                    )
161                }
162            }
163        }
164
165        Ok(CreateTopicsResponse::default()
166            .topics(Some(topics))
167            .throttle_time_ms(Some(0)))
168    }
169}