use rama::{Context, Service};
use tansu_sans_io::{ApiKey, DeleteGroupsRequest, DeleteGroupsResponse};
use tracing::instrument;
use crate::{Error, Result, Storage};
#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct DeleteGroupsService;
impl ApiKey for DeleteGroupsService {
const KEY: i16 = DeleteGroupsRequest::KEY;
}
impl<G> Service<G, DeleteGroupsRequest> for DeleteGroupsService
where
G: Storage,
{
type Response = DeleteGroupsResponse;
type Error = Error;
#[instrument(skip(ctx, req))]
async fn serve(
&self,
ctx: Context<G>,
req: DeleteGroupsRequest,
) -> Result<Self::Response, Self::Error> {
ctx.state()
.delete_groups(req.groups_names.as_deref())
.await
.map(Some)
.map(|results| {
DeleteGroupsResponse::default()
.throttle_time_ms(0)
.results(results)
})
}
}
#[cfg(all(test, feature = "dynostore"))]
mod tests {
use rama::{Context, Layer as _, Service, layer::MapStateLayer};
use tansu_sans_io::{DeleteGroupsRequest, ErrorCode};
use tracing::subscriber::DefaultGuard;
use tracing_subscriber::EnvFilter;
use url::Url;
use crate::{DeleteGroupsService, Error, StorageContainer};
fn init_tracing() -> Result<DefaultGuard, Error> {
use std::{fs::File, sync::Arc, thread};
Ok(tracing::subscriber::set_default(
tracing_subscriber::fmt()
.with_level(true)
.with_line_number(true)
.with_thread_names(false)
.with_env_filter(EnvFilter::from_default_env().add_directive(
format!("{}=debug", env!("CARGO_PKG_NAME").replace("-", "_")).parse()?,
))
.with_writer(
thread::current()
.name()
.ok_or(Error::Message(String::from("unnamed thread")))
.and_then(|name| {
File::create(format!("../logs/{}/{name}.log", env!("CARGO_PKG_NAME"),))
.map_err(Into::into)
})
.map(Arc::new)?,
)
.finish(),
))
}
#[tokio::test]
async fn delete_non_existent() -> Result<(), Error> {
let _guard = init_tracing()?;
let storage = StorageContainer::builder()
.cluster_id("tansu")
.node_id(111)
.advertised_listener(Url::parse("tcp://localhost:9092")?)
.storage(Url::parse("memory://tansu/")?)
.build()
.await?;
let service = MapStateLayer::new(|_| storage).into_layer(DeleteGroupsService);
let group_id = "abcba";
let response = service
.serve(
Context::default(),
DeleteGroupsRequest::default().groups_names(Some([group_id.into()].into())),
)
.await?;
let results = response.results.unwrap_or_default();
assert_eq!(1, results.len());
assert_eq!(group_id, results[0].group_id.as_str());
assert_eq!(ErrorCode::None, ErrorCode::try_from(results[0].error_code)?);
Ok(())
}
}