use super::list_types::{Codec, TopicDescription};
use crate::client::TimeoutSettings;
use crate::client_common::TokenCache;
use crate::client_topic::list_types::{AlterConsumer, Consumer, MeteringMode};
use crate::client_topic::topicreader::reader::{TopicReader, TopicSelectors};
use crate::client_topic::topicreader::reader_options::{
TopicReaderOptions, TopicReaderOptionsBuilder,
};
use crate::client_topic::topicwriter::writer::TopicWriter;
use crate::client_topic::topicwriter::writer_options::{
TopicWriterOptions, TopicWriterOptionsBuilder,
};
use crate::errors;
use crate::grpc_connection_manager::GrpcConnectionManager;
use crate::grpc_wrapper::raw_topic_service::alter_topic::RawAlterTopicRequest;
use crate::grpc_wrapper::raw_topic_service::create_topic::RawCreateTopicRequest;
use crate::grpc_wrapper::raw_topic_service::describe_consumer::RawDescribeConsumerRequest;
use crate::grpc_wrapper::raw_topic_service::describe_topic::RawDescribeTopicRequest;
use crate::grpc_wrapper::raw_topic_service::drop_topic::RawDropTopicRequest;
use crate::YdbError::InternalError;
use crate::{grpc_wrapper, YdbResult};
use derive_builder::{Builder, UninitializedFieldError};
use std::collections::HashMap;
use std::time::Duration;
#[derive(Builder)]
#[builder(build_fn(error = "errors::YdbError"))]
pub struct CreateTopicOptions {
#[builder(default)]
pub min_active_partitions: i64,
#[builder(default)]
pub partition_count_limit: i64,
#[builder(setter(strip_option), default)]
pub retention_period: Option<Duration>,
#[builder(default)]
pub retention_storage_mb: i64,
#[builder(default)]
pub supported_codecs: Vec<Codec>,
#[builder(default)]
pub partition_write_speed_bytes_per_second: i64,
#[builder(default)]
pub partition_write_burst_bytes: i64,
#[builder(default)]
pub consumers: Vec<Consumer>,
#[builder(default)]
pub attributes: HashMap<String, String>,
#[builder(setter(strip_option), default)]
pub metering_mode: Option<MeteringMode>,
}
#[derive(Builder)]
#[builder(build_fn(error = "errors::YdbError"))]
pub struct AlterTopicOptions {
#[builder(setter(strip_option), default)]
pub set_min_active_partitions: Option<i64>,
#[builder(setter(strip_option), default)]
pub set_partition_count_limit: Option<i64>,
#[builder(setter(strip_option), default)]
pub set_retention_period: Option<Duration>,
#[builder(setter(strip_option), default)]
pub set_retention_storage_mb: Option<i64>,
#[builder(setter(strip_option), default)]
pub set_supported_codecs: Option<Vec<Codec>>,
#[builder(setter(strip_option), default)]
pub set_partition_write_speed_bytes_per_second: Option<i64>,
#[builder(setter(strip_option), default)]
pub set_partition_write_burst_bytes: Option<i64>,
#[builder(default)]
pub alter_attributes: HashMap<String, String>,
#[builder(default)]
pub add_consumers: Vec<Consumer>,
#[builder(default)]
pub drop_consumers: Vec<String>,
#[builder(default)]
pub alter_consumers: Vec<AlterConsumer>,
#[builder(setter(strip_option), default)]
pub set_metering_mode: Option<MeteringMode>,
}
#[derive(Builder)]
#[builder(build_fn(error = "errors::YdbError"))]
pub struct DescribeTopicOptions {
#[builder(default)]
pub include_stats: bool,
#[builder(default)]
pub include_location: bool,
}
#[derive(Builder)]
#[builder(build_fn(error = "errors::YdbError"))]
pub struct DescribeConsumerOptions {
#[builder(default)]
pub include_stats: bool,
#[builder(default)]
pub include_location: bool,
}
impl From<UninitializedFieldError> for errors::YdbError {
fn from(ufe: UninitializedFieldError) -> Self {
InternalError(format!("Error during build type: {ufe}"))
}
}
#[derive(Clone)]
pub struct TopicClient {
timeouts: TimeoutSettings,
connection_manager: GrpcConnectionManager,
token_cache: TokenCache,
}
impl TopicClient {
pub(crate) fn new(
timeouts: TimeoutSettings,
connection_manager: GrpcConnectionManager,
token_cache: TokenCache,
) -> Self {
Self {
timeouts,
connection_manager,
token_cache,
}
}
pub async fn create_topic(
&mut self,
path: String,
options: CreateTopicOptions,
) -> YdbResult<()> {
let req = RawCreateTopicRequest::new(path, self.timeouts.operation_params(), options);
let mut service = self.raw_client_connection().await?;
service.create_topic(req).await?;
Ok(())
}
pub async fn alter_topic(&mut self, path: String, options: AlterTopicOptions) -> YdbResult<()> {
let req = RawAlterTopicRequest::new(path, self.timeouts.operation_params(), options);
let mut service = self.raw_client_connection().await?;
service.alter_topic(req).await?;
Ok(())
}
pub async fn describe_consumer(
&mut self,
path: String,
consumer: String,
options: DescribeConsumerOptions,
) -> YdbResult<super::list_types::ConsumerDescription> {
let req = RawDescribeConsumerRequest::new(
path,
consumer,
self.timeouts.operation_params(),
options,
);
let mut service = self.raw_client_connection().await?;
let result = service.describe_consumer(req).await?;
let description = super::list_types::ConsumerDescription::from(result);
Ok(description)
}
pub async fn describe_topic(
&mut self,
path: String,
options: DescribeTopicOptions,
) -> YdbResult<TopicDescription> {
let req = RawDescribeTopicRequest::new(path, self.timeouts.operation_params(), options);
let mut service = self.raw_client_connection().await?;
let result = service.describe_topic(req).await?;
let description = TopicDescription::from(result);
Ok(description)
}
pub async fn drop_topic(&mut self, path: String) -> YdbResult<()> {
let req = RawDropTopicRequest {
operation_params: self.timeouts.operation_params(),
path,
};
let mut service = self.raw_client_connection().await?;
service.delete_topic(req).await?;
Ok(())
}
pub async fn create_reader(
&mut self,
consumer: String,
topic: impl Into<TopicSelectors>,
) -> YdbResult<TopicReader> {
let options = TopicReaderOptionsBuilder::default()
.consumer(consumer)
.topic(topic.into())
.build()?;
TopicReader::new(
options,
self.connection_manager.clone(),
self.token_cache.clone(),
)
.await
}
pub async fn create_reader_with_params(
&mut self,
options: TopicReaderOptions,
) -> YdbResult<TopicReader> {
TopicReader::new(
options,
self.connection_manager.clone(),
self.token_cache.clone(),
)
.await
}
pub async fn create_writer_with_params(
&mut self,
writer_options: TopicWriterOptions,
) -> YdbResult<TopicWriter> {
TopicWriter::new(writer_options, self.connection_manager.clone()).await
}
pub async fn create_writer(&mut self, path: String) -> YdbResult<TopicWriter> {
TopicWriter::new(
TopicWriterOptionsBuilder::default()
.topic_path(path)
.build()
.unwrap(),
self.connection_manager.clone(),
)
.await
}
pub(crate) async fn raw_client_connection(
&self,
) -> YdbResult<grpc_wrapper::raw_topic_service::client::RawTopicClient> {
self.connection_manager
.get_auth_service(grpc_wrapper::raw_topic_service::client::RawTopicClient::new)
.await
}
}