use crate::common::{Error, init_tracing};
use nisshi_sans_io::{
CreateTopicsRequest, DeleteTopicsRequest, ErrorCode, MetadataRequest,
create_topics_request::CreatableTopic, metadata_request::MetadataRequestTopic,
};
use nisshi_storage::{CreateTopicsService, DeleteTopicsService, MetadataService, StorageContainer};
use rama::{Context, Layer as _, Service, layer::MapStateLayer};
use rand::{distr::Alphanumeric, prelude::*, rng};
use url::Url;
use uuid::Uuid;
mod common;
#[tokio::test]
async fn topic_lifecycle() -> Result<(), Error> {
let _guard = init_tracing()?;
let cluster_id = Uuid::now_v7().to_string();
let node_id = rng().random_range(0..i32::MAX);
const HOST: &str = "localhost";
const PORT: i32 = 9092;
let storage = StorageContainer::builder()
.cluster_id(cluster_id)
.node_id(node_id)
.advertised_listener(Url::parse(&format!("tcp://{HOST}:{PORT}"))?)
.storage(Url::parse("memory://nisshi/")?)
.build()
.await?;
let create_topic = {
let storage = storage.clone();
MapStateLayer::new(|_| storage).into_layer(CreateTopicsService)
};
let delete_topic = {
let storage = storage.clone();
MapStateLayer::new(|_| storage).into_layer(DeleteTopicsService)
};
let metadata = {
let storage = storage.clone();
MapStateLayer::new(|_| storage).into_layer(MetadataService)
};
let name = &rng()
.sample_iter(&Alphanumeric)
.take(15)
.map(char::from)
.collect::<String>()[..];
let num_partitions = rng().random_range(1..64);
let replication_factor = rng().random_range(0..64);
let response = create_topic
.serve(
Context::default(),
CreateTopicsRequest::default()
.validate_only(Some(false))
.topics(Some(
[CreatableTopic::default()
.name(name.into())
.num_partitions(num_partitions)
.replication_factor(replication_factor)
.assignments(Some([].into()))
.configs(Some([].into()))]
.into(),
)),
)
.await?;
let topic_id = response.topics.as_deref().unwrap_or_default()[0]
.topic_id
.unwrap();
{
let response = metadata
.serve(
Context::default(),
MetadataRequest::default()
.allow_auto_topic_creation(Some(false))
.include_cluster_authorized_operations(Some(false))
.include_cluster_authorized_operations(Some(false))
.topics(Some(
[MetadataRequestTopic::default().topic_id(Some(topic_id))].into(),
)),
)
.await?;
let topics = response.topics.as_deref().unwrap_or_default();
assert_eq!(1, topics.len());
assert_eq!(ErrorCode::None, ErrorCode::try_from(topics[0].error_code)?);
assert_eq!(Some(name), topics[0].name.as_deref());
assert_eq!(Some(topic_id), topics[0].topic_id);
}
{
let response = metadata
.serve(
Context::default(),
MetadataRequest::default()
.allow_auto_topic_creation(Some(false))
.include_cluster_authorized_operations(Some(false))
.include_cluster_authorized_operations(Some(false))
.topics(Some(
[MetadataRequestTopic::default().name(Some(name.into()))].into(),
)),
)
.await?;
let topics = response.topics.as_deref().unwrap_or_default();
assert_eq!(1, topics.len());
assert_eq!(ErrorCode::None, ErrorCode::try_from(topics[0].error_code)?);
assert_eq!(Some(name), topics[0].name.as_deref());
assert_eq!(Some(topic_id), topics[0].topic_id);
}
{
let response = create_topic
.serve(
Context::default(),
CreateTopicsRequest::default()
.validate_only(Some(false))
.topics(Some(
[CreatableTopic::default()
.name(name.into())
.num_partitions(num_partitions)
.replication_factor(replication_factor)
.assignments(Some([].into()))
.configs(Some([].into()))]
.into(),
)),
)
.await?;
let topics = response.topics.as_deref().unwrap_or_default();
assert_eq!(1, topics.len());
assert_eq!(
ErrorCode::TopicAlreadyExists,
ErrorCode::try_from(topics[0].error_code)?
);
}
{
let response = delete_topic
.serve(
Context::default(),
DeleteTopicsRequest::default().topic_names(Some([name.into()].into())),
)
.await?;
let results = response.responses.as_deref().unwrap_or_default();
assert_eq!(1, results.len());
assert_eq!(ErrorCode::None, ErrorCode::try_from(results[0].error_code)?);
}
{
let response = delete_topic
.serve(
Context::default(),
DeleteTopicsRequest::default().topic_names(Some([name.into()].into())),
)
.await?;
let results = response.responses.as_deref().unwrap_or_default();
assert_eq!(1, results.len());
assert_eq!(
ErrorCode::UnknownTopicOrPartition,
ErrorCode::try_from(results[0].error_code)?
);
}
Ok(())
}