Skip to main content

tansu_storage/service/
delete_groups.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use rama::{Context, Service};
16use tansu_sans_io::{ApiKey, DeleteGroupsRequest, DeleteGroupsResponse};
17use tracing::instrument;
18
19use crate::{Error, Result, Storage};
20
21/// A [`Service`] using [`Storage`] as [`Context`] taking [`DeleteGroupsRequest`] returning [`DeleteGroupsResponse`].
22/// ```
23/// use rama::{Context, Layer, Service as _, layer::MapStateLayer};
24/// use tansu_sans_io::{DeleteGroupsRequest, ErrorCode};
25/// use tansu_storage::{DeleteGroupsService, Error, StorageContainer};
26/// use url::Url;
27///
28/// # #[tokio::main]
29/// # async fn main() -> Result<(), Error> {
30/// let storage = StorageContainer::builder()
31///     .cluster_id("tansu")
32///     .node_id(111)
33///     .advertised_listener(Url::parse("tcp://localhost:9092")?)
34///     .storage(Url::parse("memory://tansu/")?)
35///     .build()
36///     .await?;
37///
38/// let service = MapStateLayer::new(|_| storage).into_layer(DeleteGroupsService);
39///
40/// let group_id = "abcba";
41///
42/// let response = service
43///     .serve(
44///         Context::default(),
45///         DeleteGroupsRequest::default().groups_names(Some([group_id.into()].into())),
46///     )
47///     .await?;
48///
49/// let results = response.results.unwrap_or_default();
50/// assert_eq!(1, results.len());
51/// assert_eq!(group_id, results[0].group_id.as_str());
52/// assert_eq!(ErrorCode::None, ErrorCode::try_from(results[0].error_code)?);
53/// # Ok(())
54/// # }
55/// ```
56#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
57pub struct DeleteGroupsService;
58
59impl ApiKey for DeleteGroupsService {
60    const KEY: i16 = DeleteGroupsRequest::KEY;
61}
62
63impl<G> Service<G, DeleteGroupsRequest> for DeleteGroupsService
64where
65    G: Storage,
66{
67    type Response = DeleteGroupsResponse;
68    type Error = Error;
69
70    #[instrument(skip(ctx, req))]
71    async fn serve(
72        &self,
73        ctx: Context<G>,
74        req: DeleteGroupsRequest,
75    ) -> Result<Self::Response, Self::Error> {
76        ctx.state()
77            .delete_groups(req.groups_names.as_deref())
78            .await
79            .map(Some)
80            .map(|results| {
81                DeleteGroupsResponse::default()
82                    .throttle_time_ms(0)
83                    .results(results)
84            })
85    }
86}
87
88#[cfg(all(test, feature = "dynostore"))]
89mod tests {
90    use rama::{Context, Layer as _, Service, layer::MapStateLayer};
91    use tansu_sans_io::{DeleteGroupsRequest, ErrorCode};
92    use tracing::subscriber::DefaultGuard;
93    use tracing_subscriber::EnvFilter;
94    use url::Url;
95
96    use crate::{DeleteGroupsService, Error, StorageContainer};
97
98    fn init_tracing() -> Result<DefaultGuard, Error> {
99        use std::{fs::File, sync::Arc, thread};
100
101        Ok(tracing::subscriber::set_default(
102            tracing_subscriber::fmt()
103                .with_level(true)
104                .with_line_number(true)
105                .with_thread_names(false)
106                .with_env_filter(EnvFilter::from_default_env().add_directive(
107                    format!("{}=debug", env!("CARGO_PKG_NAME").replace("-", "_")).parse()?,
108                ))
109                .with_writer(
110                    thread::current()
111                        .name()
112                        .ok_or(Error::Message(String::from("unnamed thread")))
113                        .and_then(|name| {
114                            File::create(format!("../logs/{}/{name}.log", env!("CARGO_PKG_NAME"),))
115                                .map_err(Into::into)
116                        })
117                        .map(Arc::new)?,
118                )
119                .finish(),
120        ))
121    }
122
123    #[tokio::test]
124    async fn delete_non_existent() -> Result<(), Error> {
125        let _guard = init_tracing()?;
126
127        let storage = StorageContainer::builder()
128            .cluster_id("tansu")
129            .node_id(111)
130            .advertised_listener(Url::parse("tcp://localhost:9092")?)
131            .storage(Url::parse("memory://tansu/")?)
132            .build()
133            .await?;
134
135        let service = MapStateLayer::new(|_| storage).into_layer(DeleteGroupsService);
136
137        let group_id = "abcba";
138
139        let response = service
140            .serve(
141                Context::default(),
142                DeleteGroupsRequest::default().groups_names(Some([group_id.into()].into())),
143            )
144            .await?;
145
146        let results = response.results.unwrap_or_default();
147        assert_eq!(1, results.len());
148        assert_eq!(group_id, results[0].group_id.as_str());
149        assert_eq!(ErrorCode::None, ErrorCode::try_from(results[0].error_code)?);
150
151        Ok(())
152    }
153}