1use super::list_types::{Codec, TopicDescription};
2use crate::client::TimeoutSettings;
3use crate::client_common::TokenCache;
4use crate::client_topic::list_types::{AlterConsumer, Consumer, MeteringMode};
5use crate::client_topic::topicreader::reader::{TopicReader, TopicSelectors};
6use crate::client_topic::topicwriter::writer::TopicWriter;
7use crate::client_topic::topicwriter::writer_options::{
8 TopicWriterOptions, TopicWriterOptionsBuilder,
9};
10use crate::errors;
11use crate::grpc_connection_manager::GrpcConnectionManager;
12use crate::grpc_wrapper::raw_topic_service::alter_topic::RawAlterTopicRequest;
13use crate::grpc_wrapper::raw_topic_service::create_topic::RawCreateTopicRequest;
14use crate::grpc_wrapper::raw_topic_service::describe_consumer::RawDescribeConsumerRequest;
15use crate::grpc_wrapper::raw_topic_service::describe_topic::RawDescribeTopicRequest;
16use crate::grpc_wrapper::raw_topic_service::drop_topic::RawDropTopicRequest;
17use crate::YdbError::InternalError;
18use crate::{grpc_wrapper, YdbResult};
19use derive_builder::{Builder, UninitializedFieldError};
20use std::collections::HashMap;
21use std::time::Duration;
22
23#[derive(Builder)]
24#[builder(build_fn(error = "errors::YdbError"))]
25pub struct CreateTopicOptions {
26 #[builder(default)]
28 pub min_active_partitions: i64,
29 #[builder(default)]
30 pub partition_count_limit: i64,
31 #[builder(setter(strip_option), default)]
32 pub retention_period: Option<Duration>,
33 #[builder(default)]
34 pub retention_storage_mb: i64,
35 #[builder(default)]
36 pub supported_codecs: Vec<Codec>,
37 #[builder(default)]
38 pub partition_write_speed_bytes_per_second: i64,
39 #[builder(default)]
40 pub partition_write_burst_bytes: i64,
41 #[builder(default)]
42 pub consumers: Vec<Consumer>,
43 #[builder(default)]
44 pub attributes: HashMap<String, String>,
45 #[builder(setter(strip_option), default)]
46 pub metering_mode: Option<MeteringMode>,
47}
48
49#[derive(Builder)]
50#[builder(build_fn(error = "errors::YdbError"))]
51pub struct AlterTopicOptions {
52 #[builder(setter(strip_option), default)]
54 pub set_min_active_partitions: Option<i64>,
55
56 #[builder(setter(strip_option), default)]
57 pub set_partition_count_limit: Option<i64>,
58
59 #[builder(setter(strip_option), default)]
60 pub set_retention_period: Option<Duration>,
61
62 #[builder(setter(strip_option), default)]
63 pub set_retention_storage_mb: Option<i64>,
64
65 #[builder(setter(strip_option), default)]
66 pub set_supported_codecs: Option<Vec<Codec>>,
67
68 #[builder(setter(strip_option), default)]
69 pub set_partition_write_speed_bytes_per_second: Option<i64>,
70
71 #[builder(setter(strip_option), default)]
72 pub set_partition_write_burst_bytes: Option<i64>,
73
74 #[builder(default)]
75 pub alter_attributes: HashMap<String, String>,
76
77 #[builder(default)]
78 pub add_consumers: Vec<Consumer>,
79
80 #[builder(default)]
81 pub drop_consumers: Vec<String>,
82
83 #[builder(default)]
84 pub alter_consumers: Vec<AlterConsumer>,
85
86 #[builder(setter(strip_option), default)]
87 pub set_metering_mode: Option<MeteringMode>,
88}
89
90#[derive(Builder)]
91#[builder(build_fn(error = "errors::YdbError"))]
92pub struct DescribeTopicOptions {
93 #[builder(default)]
95 pub include_stats: bool,
96 #[builder(default)]
97 pub include_location: bool,
98}
99
100#[derive(Builder)]
101#[builder(build_fn(error = "errors::YdbError"))]
102pub struct DescribeConsumerOptions {
103 #[builder(default)]
105 pub include_stats: bool,
106 #[builder(default)]
107 pub include_location: bool,
108}
109
110impl From<UninitializedFieldError> for errors::YdbError {
111 fn from(ufe: UninitializedFieldError) -> Self {
112 InternalError(format!("Error during build type: {ufe}"))
113 }
114}
115
116#[derive(Clone)]
117pub struct TopicClient {
118 timeouts: TimeoutSettings,
119 connection_manager: GrpcConnectionManager,
120 token_cache: TokenCache,
121}
122
123impl TopicClient {
124 pub(crate) fn new(
125 timeouts: TimeoutSettings,
126 connection_manager: GrpcConnectionManager,
127 token_cache: TokenCache,
128 ) -> Self {
129 Self {
130 timeouts,
131 connection_manager,
132 token_cache,
133 }
134 }
135
136 pub async fn create_topic(
137 &mut self,
138 path: String,
139 options: CreateTopicOptions,
140 ) -> YdbResult<()> {
141 let req = RawCreateTopicRequest::new(path, self.timeouts.operation_params(), options);
142
143 let mut service = self.raw_client_connection().await?;
144 service.create_topic(req).await?;
145
146 Ok(())
147 }
148
149 pub async fn alter_topic(&mut self, path: String, options: AlterTopicOptions) -> YdbResult<()> {
150 let req = RawAlterTopicRequest::new(path, self.timeouts.operation_params(), options);
151
152 let mut service = self.raw_client_connection().await?;
153 service.alter_topic(req).await?;
154
155 Ok(())
156 }
157
158 pub async fn describe_consumer(
159 &mut self,
160 path: String,
161 consumer: String,
162 options: DescribeConsumerOptions,
163 ) -> YdbResult<super::list_types::ConsumerDescription> {
164 let req = RawDescribeConsumerRequest::new(
165 path,
166 consumer,
167 self.timeouts.operation_params(),
168 options,
169 );
170
171 let mut service = self.raw_client_connection().await?;
172 let result = service.describe_consumer(req).await?;
173 let description = super::list_types::ConsumerDescription::from(result);
174
175 Ok(description)
176 }
177
178 pub async fn describe_topic(
179 &mut self,
180 path: String,
181 options: DescribeTopicOptions,
182 ) -> YdbResult<TopicDescription> {
183 let req = RawDescribeTopicRequest::new(path, self.timeouts.operation_params(), options);
184
185 let mut service = self.raw_client_connection().await?;
186 let result = service.describe_topic(req).await?;
187 let description = TopicDescription::from(result);
188
189 Ok(description)
190 }
191
192 pub async fn drop_topic(&mut self, path: String) -> YdbResult<()> {
193 let req = RawDropTopicRequest {
194 operation_params: self.timeouts.operation_params(),
195 path,
196 };
197
198 let mut service = self.raw_client_connection().await?;
199 service.delete_topic(req).await?;
200
201 Ok(())
202 }
203
204 pub async fn create_reader(
205 &mut self,
206 consumer: String,
207 topic: impl Into<TopicSelectors>,
208 ) -> YdbResult<TopicReader> {
209 TopicReader::new(
210 consumer,
211 topic.into(),
212 self.connection_manager.clone(),
213 self.token_cache.clone(),
214 )
215 .await
216 }
217
218 pub async fn create_writer_with_params(
219 &mut self,
220 writer_options: TopicWriterOptions,
221 ) -> YdbResult<TopicWriter> {
222 TopicWriter::new(writer_options, self.connection_manager.clone()).await
223 }
224
225 pub async fn create_writer(&mut self, path: String) -> YdbResult<TopicWriter> {
226 TopicWriter::new(
227 TopicWriterOptionsBuilder::default()
228 .topic_path(path)
229 .build()
230 .unwrap(),
231 self.connection_manager.clone(),
232 )
233 .await
234 }
235
236 pub(crate) async fn raw_client_connection(
237 &self,
238 ) -> YdbResult<grpc_wrapper::raw_topic_service::client::RawTopicClient> {
239 self.connection_manager
240 .get_auth_service(grpc_wrapper::raw_topic_service::client::RawTopicClient::new)
241 .await
242 }
243}