tansu_storage/service/create_topics.rs
1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use rama::{Context, Service};
16use tansu_sans_io::{
17 ApiKey, CreateTopicsRequest, CreateTopicsResponse, ErrorCode, NULL_TOPIC_ID,
18 create_topics_response::CreatableTopicResult,
19};
20use tracing::{debug, instrument};
21
22use crate::{Error, Result, Storage};
23
24/// A [`Service`] using [`Storage`] as [`Context`] taking [`CreateTopicsRequest`] returning [`CreateTopicsResponse`].
25/// ```
26/// use rama::{Context, Layer, Service as _, layer::MapStateLayer};
27/// use tansu_sans_io::{NULL_TOPIC_ID, CreateTopicsRequest,
28/// create_topics_request::CreatableTopic, ErrorCode};
29/// use tansu_storage::{CreateTopicsService, Error, StorageContainer};
30/// use url::Url;
31///
32/// # #[tokio::main]
33/// # async fn main() -> Result<(), Error> {
34/// let storage = StorageContainer::builder()
35/// .cluster_id("tansu")
36/// .node_id(111)
37/// .advertised_listener(Url::parse("tcp://localhost:9092")?)
38/// .storage(Url::parse("memory://tansu/")?)
39/// .build()
40/// .await?;
41///
42/// let service = MapStateLayer::new(|_| storage).into_layer(CreateTopicsService);
43///
44/// let name = "abcba";
45///
46/// let response = service
47/// .serve(
48/// Context::default(),
49/// CreateTopicsRequest::default()
50/// .topics(Some(vec![
51/// CreatableTopic::default()
52/// .name(name.into())
53/// .num_partitions(1)
54/// .replication_factor(3)
55/// .assignments(Some([].into()))
56/// .configs(Some([].into())),
57/// ]))
58/// .validate_only(Some(false)),
59/// )
60/// .await?;
61///
62/// let topics = response.topics.unwrap_or_default();
63///
64/// assert_eq!(1, topics.len());
65/// assert_eq!(name, topics[0].name.as_str());
66/// assert_ne!(Some(NULL_TOPIC_ID), topics[0].topic_id);
67/// assert_eq!(Some(1), topics[0].num_partitions);
68/// assert_eq!(Some(3), topics[0].replication_factor);
69/// assert_eq!(ErrorCode::None, ErrorCode::try_from(topics[0].error_code)?);
70/// # Ok(())
71/// # }
72/// ```
73#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
74pub struct CreateTopicsService;
75
76impl ApiKey for CreateTopicsService {
77 const KEY: i16 = CreateTopicsRequest::KEY;
78}
79
80impl<G> Service<G, CreateTopicsRequest> for CreateTopicsService
81where
82 G: Storage,
83{
84 type Response = CreateTopicsResponse;
85 type Error = Error;
86
87 #[instrument(skip(ctx, req))]
88 async fn serve(
89 &self,
90 ctx: Context<G>,
91 req: CreateTopicsRequest,
92 ) -> Result<Self::Response, Self::Error> {
93 let mut topics = vec![];
94
95 for mut topic in req.topics.unwrap_or_default() {
96 let name = topic.name.clone();
97
98 let num_partitions = Some(match topic.num_partitions {
99 -1 => {
100 topic.num_partitions = 3;
101 topic.num_partitions
102 }
103 otherwise => otherwise,
104 });
105
106 let replication_factor = Some(match topic.replication_factor {
107 -1 => {
108 topic.replication_factor = 1;
109 topic.replication_factor
110 }
111 otherwise => otherwise,
112 });
113
114 match ctx
115 .state()
116 .create_topic(topic, req.validate_only.unwrap_or_default())
117 .await
118 {
119 Ok(topic_id) => {
120 debug!(?topic_id);
121
122 topics.push(
123 CreatableTopicResult::default()
124 .name(name)
125 .topic_id(Some(topic_id.into_bytes()))
126 .error_code(ErrorCode::None.into())
127 .error_message(None)
128 .topic_config_error_code(Some(ErrorCode::None.into()))
129 .num_partitions(num_partitions)
130 .replication_factor(replication_factor)
131 .configs(Some([].into())),
132 );
133 }
134
135 Err(Error::Api(error_code)) => topics.push(
136 CreatableTopicResult::default()
137 .name(name)
138 .topic_id(Some(NULL_TOPIC_ID))
139 .error_code(error_code.into())
140 .error_message(Some(error_code.to_string()))
141 .topic_config_error_code(None)
142 .num_partitions(num_partitions)
143 .replication_factor(replication_factor)
144 .configs(Some([].into())),
145 ),
146
147 Err(error) => {
148 debug!(?error);
149
150 topics.push(
151 CreatableTopicResult::default()
152 .name(name)
153 .topic_id(Some(NULL_TOPIC_ID))
154 .error_code(ErrorCode::UnknownServerError.into())
155 .error_message(None)
156 .topic_config_error_code(None)
157 .num_partitions(None)
158 .replication_factor(None)
159 .configs(Some([].into())),
160 )
161 }
162 }
163 }
164
165 Ok(CreateTopicsResponse::default()
166 .topics(Some(topics))
167 .throttle_time_ms(Some(0)))
168 }
169}