Skip to main content

tansu_storage/service/
delete_topics.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use rama::{Context, Service};
16use tansu_sans_io::{
17    ApiKey, DeleteTopicsRequest, DeleteTopicsResponse, delete_topics_response::DeletableTopicResult,
18};
19use tracing::instrument;
20
21use crate::{Error, Result, Storage};
22
23/// A [`Service`] using [`Storage`] as [`Context`] taking [`DeleteTopicsRequest`] returning [`DeleteTopicsResponse`].
24/// ```
25/// use rama::{Context, Layer, Service as _, layer::MapStateLayer};
26/// use tansu_sans_io::{DeleteTopicsRequest, DeleteTopicsResponse,
27///     delete_topics_response::DeletableTopicResult, ErrorCode};
28/// use tansu_storage::{DeleteTopicsService, Error, StorageContainer};
29/// use url::Url;
30///
31/// # #[tokio::main]
32/// # async fn main() -> Result<(), Error> {
33/// let storage = StorageContainer::builder()
34///     .cluster_id("tansu")
35///     .node_id(111)
36///     .advertised_listener(Url::parse("tcp://localhost:9092")?)
37///     .storage(Url::parse("memory://tansu/")?)
38///     .build()
39///     .await?;
40///
41/// let service = MapStateLayer::new(|_| storage).into_layer(DeleteTopicsService);
42///
43/// let topic = "pqr";
44///
45/// let error_code = ErrorCode::UnknownTopicOrPartition;
46///
47/// assert_eq!(
48///     DeleteTopicsResponse::default()
49///         .throttle_time_ms(Some(0))
50///         .responses(Some(vec![
51///             DeletableTopicResult::default()
52///                 .error_code(error_code.into())
53///                 .error_message(Some(error_code.to_string()))
54///                 .name(Some(topic.into())),
55///         ])),
56///     service
57///         .serve(
58///             Context::default(),
59///             DeleteTopicsRequest::default().topic_names(Some(vec![topic.into()]))
60///         )
61///         .await?
62/// );
63/// # Ok(())
64/// # }
65/// ```
66#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
67pub struct DeleteTopicsService;
68
69impl ApiKey for DeleteTopicsService {
70    const KEY: i16 = DeleteTopicsRequest::KEY;
71}
72
73impl<G> Service<G, DeleteTopicsRequest> for DeleteTopicsService
74where
75    G: Storage,
76{
77    type Response = DeleteTopicsResponse;
78    type Error = Error;
79
80    #[instrument(skip(ctx, req))]
81    async fn serve(
82        &self,
83        ctx: Context<G>,
84        req: DeleteTopicsRequest,
85    ) -> Result<Self::Response, Self::Error> {
86        let mut responses = vec![];
87
88        for topic in req.topics.unwrap_or_default() {
89            let error_code = ctx.state().delete_topic(&topic.clone().into()).await?;
90            responses.push(
91                DeletableTopicResult::default()
92                    .name(topic.name.clone())
93                    .topic_id(Some(topic.topic_id))
94                    .error_code(i16::from(error_code))
95                    .error_message(Some(error_code.to_string())),
96            );
97        }
98
99        for topic in req.topic_names.unwrap_or_default() {
100            let error_code = ctx.state().delete_topic(&topic.clone().into()).await?;
101
102            responses.push(
103                DeletableTopicResult::default()
104                    .name(Some(topic))
105                    .topic_id(None)
106                    .error_code(i16::from(error_code))
107                    .error_message(Some(error_code.to_string())),
108            );
109        }
110
111        Ok(DeleteTopicsResponse::default()
112            .throttle_time_ms(Some(0))
113            .responses(Some(responses)))
114    }
115}