tansu_storage/service/
delete_groups.rs1use rama::{Context, Service};
16use tansu_sans_io::{ApiKey, DeleteGroupsRequest, DeleteGroupsResponse};
17use tracing::instrument;
18
19use crate::{Error, Result, Storage};
20
21#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
57pub struct DeleteGroupsService;
58
59impl ApiKey for DeleteGroupsService {
60 const KEY: i16 = DeleteGroupsRequest::KEY;
61}
62
63impl<G> Service<G, DeleteGroupsRequest> for DeleteGroupsService
64where
65 G: Storage,
66{
67 type Response = DeleteGroupsResponse;
68 type Error = Error;
69
70 #[instrument(skip(ctx, req))]
71 async fn serve(
72 &self,
73 ctx: Context<G>,
74 req: DeleteGroupsRequest,
75 ) -> Result<Self::Response, Self::Error> {
76 ctx.state()
77 .delete_groups(req.groups_names.as_deref())
78 .await
79 .map(Some)
80 .map(|results| {
81 DeleteGroupsResponse::default()
82 .throttle_time_ms(0)
83 .results(results)
84 })
85 }
86}
87
88#[cfg(all(test, feature = "dynostore"))]
89mod tests {
90 use rama::{Context, Layer as _, Service, layer::MapStateLayer};
91 use tansu_sans_io::{DeleteGroupsRequest, ErrorCode};
92 use tracing::subscriber::DefaultGuard;
93 use tracing_subscriber::EnvFilter;
94 use url::Url;
95
96 use crate::{DeleteGroupsService, Error, StorageContainer};
97
98 fn init_tracing() -> Result<DefaultGuard, Error> {
99 use std::{fs::File, sync::Arc, thread};
100
101 Ok(tracing::subscriber::set_default(
102 tracing_subscriber::fmt()
103 .with_level(true)
104 .with_line_number(true)
105 .with_thread_names(false)
106 .with_env_filter(EnvFilter::from_default_env().add_directive(
107 format!("{}=debug", env!("CARGO_PKG_NAME").replace("-", "_")).parse()?,
108 ))
109 .with_writer(
110 thread::current()
111 .name()
112 .ok_or(Error::Message(String::from("unnamed thread")))
113 .and_then(|name| {
114 File::create(format!("../logs/{}/{name}.log", env!("CARGO_PKG_NAME"),))
115 .map_err(Into::into)
116 })
117 .map(Arc::new)?,
118 )
119 .finish(),
120 ))
121 }
122
123 #[tokio::test]
124 async fn delete_non_existent() -> Result<(), Error> {
125 let _guard = init_tracing()?;
126
127 let storage = StorageContainer::builder()
128 .cluster_id("tansu")
129 .node_id(111)
130 .advertised_listener(Url::parse("tcp://localhost:9092")?)
131 .storage(Url::parse("memory://tansu/")?)
132 .build()
133 .await?;
134
135 let service = MapStateLayer::new(|_| storage).into_layer(DeleteGroupsService);
136
137 let group_id = "abcba";
138
139 let response = service
140 .serve(
141 Context::default(),
142 DeleteGroupsRequest::default().groups_names(Some([group_id.into()].into())),
143 )
144 .await?;
145
146 let results = response.results.unwrap_or_default();
147 assert_eq!(1, results.len());
148 assert_eq!(group_id, results[0].group_id.as_str());
149 assert_eq!(ErrorCode::None, ErrorCode::try_from(results[0].error_code)?);
150
151 Ok(())
152 }
153}