tansu_storage/service/delete_topics.rs
1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use rama::{Context, Service};
16use tansu_sans_io::{
17 ApiKey, DeleteTopicsRequest, DeleteTopicsResponse, delete_topics_response::DeletableTopicResult,
18};
19use tracing::instrument;
20
21use crate::{Error, Result, Storage};
22
23/// A [`Service`] using [`Storage`] as [`Context`] taking [`DeleteTopicsRequest`] returning [`DeleteTopicsResponse`].
24/// ```
25/// use rama::{Context, Layer, Service as _, layer::MapStateLayer};
26/// use tansu_sans_io::{DeleteTopicsRequest, DeleteTopicsResponse,
27/// delete_topics_response::DeletableTopicResult, ErrorCode};
28/// use tansu_storage::{DeleteTopicsService, Error, StorageContainer};
29/// use url::Url;
30///
31/// # #[tokio::main]
32/// # async fn main() -> Result<(), Error> {
33/// let storage = StorageContainer::builder()
34/// .cluster_id("tansu")
35/// .node_id(111)
36/// .advertised_listener(Url::parse("tcp://localhost:9092")?)
37/// .storage(Url::parse("memory://tansu/")?)
38/// .build()
39/// .await?;
40///
41/// let service = MapStateLayer::new(|_| storage).into_layer(DeleteTopicsService);
42///
43/// let topic = "pqr";
44///
45/// let error_code = ErrorCode::UnknownTopicOrPartition;
46///
47/// assert_eq!(
48/// DeleteTopicsResponse::default()
49/// .throttle_time_ms(Some(0))
50/// .responses(Some(vec![
51/// DeletableTopicResult::default()
52/// .error_code(error_code.into())
53/// .error_message(Some(error_code.to_string()))
54/// .name(Some(topic.into())),
55/// ])),
56/// service
57/// .serve(
58/// Context::default(),
59/// DeleteTopicsRequest::default().topic_names(Some(vec![topic.into()]))
60/// )
61/// .await?
62/// );
63/// # Ok(())
64/// # }
65/// ```
66#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
67pub struct DeleteTopicsService;
68
69impl ApiKey for DeleteTopicsService {
70 const KEY: i16 = DeleteTopicsRequest::KEY;
71}
72
73impl<G> Service<G, DeleteTopicsRequest> for DeleteTopicsService
74where
75 G: Storage,
76{
77 type Response = DeleteTopicsResponse;
78 type Error = Error;
79
80 #[instrument(skip(ctx, req))]
81 async fn serve(
82 &self,
83 ctx: Context<G>,
84 req: DeleteTopicsRequest,
85 ) -> Result<Self::Response, Self::Error> {
86 let mut responses = vec![];
87
88 for topic in req.topics.unwrap_or_default() {
89 let error_code = ctx.state().delete_topic(&topic.clone().into()).await?;
90 responses.push(
91 DeletableTopicResult::default()
92 .name(topic.name.clone())
93 .topic_id(Some(topic.topic_id))
94 .error_code(i16::from(error_code))
95 .error_message(Some(error_code.to_string())),
96 );
97 }
98
99 for topic in req.topic_names.unwrap_or_default() {
100 let error_code = ctx.state().delete_topic(&topic.clone().into()).await?;
101
102 responses.push(
103 DeletableTopicResult::default()
104 .name(Some(topic))
105 .topic_id(None)
106 .error_code(i16::from(error_code))
107 .error_message(Some(error_code.to_string())),
108 );
109 }
110
111 Ok(DeleteTopicsResponse::default()
112 .throttle_time_ms(Some(0))
113 .responses(Some(responses)))
114 }
115}