ydb/client_topic/
client.rs

1use super::list_types::{Codec, TopicDescription};
2use crate::client::TimeoutSettings;
3use crate::client_common::TokenCache;
4use crate::client_topic::list_types::{AlterConsumer, Consumer, MeteringMode};
5use crate::client_topic::topicreader::reader::{TopicReader, TopicSelectors};
6use crate::client_topic::topicwriter::writer::TopicWriter;
7use crate::client_topic::topicwriter::writer_options::{
8    TopicWriterOptions, TopicWriterOptionsBuilder,
9};
10use crate::errors;
11use crate::grpc_connection_manager::GrpcConnectionManager;
12use crate::grpc_wrapper::raw_topic_service::alter_topic::RawAlterTopicRequest;
13use crate::grpc_wrapper::raw_topic_service::create_topic::RawCreateTopicRequest;
14use crate::grpc_wrapper::raw_topic_service::describe_consumer::RawDescribeConsumerRequest;
15use crate::grpc_wrapper::raw_topic_service::describe_topic::RawDescribeTopicRequest;
16use crate::grpc_wrapper::raw_topic_service::drop_topic::RawDropTopicRequest;
17use crate::YdbError::InternalError;
18use crate::{grpc_wrapper, YdbResult};
19use derive_builder::{Builder, UninitializedFieldError};
20use std::collections::HashMap;
21use std::time::Duration;
22
23#[derive(Builder)]
24#[builder(build_fn(error = "errors::YdbError"))]
25pub struct CreateTopicOptions {
26    // Use CreateTopicOptionsBuilder
27    #[builder(default)]
28    pub min_active_partitions: i64,
29    #[builder(default)]
30    pub partition_count_limit: i64,
31    #[builder(setter(strip_option), default)]
32    pub retention_period: Option<Duration>,
33    #[builder(default)]
34    pub retention_storage_mb: i64,
35    #[builder(default)]
36    pub supported_codecs: Vec<Codec>,
37    #[builder(default)]
38    pub partition_write_speed_bytes_per_second: i64,
39    #[builder(default)]
40    pub partition_write_burst_bytes: i64,
41    #[builder(default)]
42    pub consumers: Vec<Consumer>,
43    #[builder(default)]
44    pub attributes: HashMap<String, String>,
45    #[builder(setter(strip_option), default)]
46    pub metering_mode: Option<MeteringMode>,
47}
48
49#[derive(Builder)]
50#[builder(build_fn(error = "errors::YdbError"))]
51pub struct AlterTopicOptions {
52    // Use AlterTopicOptionsBuilder
53    #[builder(setter(strip_option), default)]
54    pub set_min_active_partitions: Option<i64>,
55
56    #[builder(setter(strip_option), default)]
57    pub set_partition_count_limit: Option<i64>,
58
59    #[builder(setter(strip_option), default)]
60    pub set_retention_period: Option<Duration>,
61
62    #[builder(setter(strip_option), default)]
63    pub set_retention_storage_mb: Option<i64>,
64
65    #[builder(setter(strip_option), default)]
66    pub set_supported_codecs: Option<Vec<Codec>>,
67
68    #[builder(setter(strip_option), default)]
69    pub set_partition_write_speed_bytes_per_second: Option<i64>,
70
71    #[builder(setter(strip_option), default)]
72    pub set_partition_write_burst_bytes: Option<i64>,
73
74    #[builder(default)]
75    pub alter_attributes: HashMap<String, String>,
76
77    #[builder(default)]
78    pub add_consumers: Vec<Consumer>,
79
80    #[builder(default)]
81    pub drop_consumers: Vec<String>,
82
83    #[builder(default)]
84    pub alter_consumers: Vec<AlterConsumer>,
85
86    #[builder(setter(strip_option), default)]
87    pub set_metering_mode: Option<MeteringMode>,
88}
89
90#[derive(Builder)]
91#[builder(build_fn(error = "errors::YdbError"))]
92pub struct DescribeTopicOptions {
93    // Use DescribeTopicOptionsBuilder
94    #[builder(default)]
95    pub include_stats: bool,
96    #[builder(default)]
97    pub include_location: bool,
98}
99
100#[derive(Builder)]
101#[builder(build_fn(error = "errors::YdbError"))]
102pub struct DescribeConsumerOptions {
103    // Use DescribeConsumerOptionsBuilder
104    #[builder(default)]
105    pub include_stats: bool,
106    #[builder(default)]
107    pub include_location: bool,
108}
109
110impl From<UninitializedFieldError> for errors::YdbError {
111    fn from(ufe: UninitializedFieldError) -> Self {
112        InternalError(format!("Error during build type: {ufe}"))
113    }
114}
115
116#[derive(Clone)]
117pub struct TopicClient {
118    timeouts: TimeoutSettings,
119    connection_manager: GrpcConnectionManager,
120    token_cache: TokenCache,
121}
122
123impl TopicClient {
124    pub(crate) fn new(
125        timeouts: TimeoutSettings,
126        connection_manager: GrpcConnectionManager,
127        token_cache: TokenCache,
128    ) -> Self {
129        Self {
130            timeouts,
131            connection_manager,
132            token_cache,
133        }
134    }
135
136    pub async fn create_topic(
137        &mut self,
138        path: String,
139        options: CreateTopicOptions,
140    ) -> YdbResult<()> {
141        let req = RawCreateTopicRequest::new(path, self.timeouts.operation_params(), options);
142
143        let mut service = self.raw_client_connection().await?;
144        service.create_topic(req).await?;
145
146        Ok(())
147    }
148
149    pub async fn alter_topic(&mut self, path: String, options: AlterTopicOptions) -> YdbResult<()> {
150        let req = RawAlterTopicRequest::new(path, self.timeouts.operation_params(), options);
151
152        let mut service = self.raw_client_connection().await?;
153        service.alter_topic(req).await?;
154
155        Ok(())
156    }
157
158    pub async fn describe_consumer(
159        &mut self,
160        path: String,
161        consumer: String,
162        options: DescribeConsumerOptions,
163    ) -> YdbResult<super::list_types::ConsumerDescription> {
164        let req = RawDescribeConsumerRequest::new(
165            path,
166            consumer,
167            self.timeouts.operation_params(),
168            options,
169        );
170
171        let mut service = self.raw_client_connection().await?;
172        let result = service.describe_consumer(req).await?;
173        let description = super::list_types::ConsumerDescription::from(result);
174
175        Ok(description)
176    }
177
178    pub async fn describe_topic(
179        &mut self,
180        path: String,
181        options: DescribeTopicOptions,
182    ) -> YdbResult<TopicDescription> {
183        let req = RawDescribeTopicRequest::new(path, self.timeouts.operation_params(), options);
184
185        let mut service = self.raw_client_connection().await?;
186        let result = service.describe_topic(req).await?;
187        let description = TopicDescription::from(result);
188
189        Ok(description)
190    }
191
192    pub async fn drop_topic(&mut self, path: String) -> YdbResult<()> {
193        let req = RawDropTopicRequest {
194            operation_params: self.timeouts.operation_params(),
195            path,
196        };
197
198        let mut service = self.raw_client_connection().await?;
199        service.delete_topic(req).await?;
200
201        Ok(())
202    }
203
204    pub async fn create_reader(
205        &mut self,
206        consumer: String,
207        topic: impl Into<TopicSelectors>,
208    ) -> YdbResult<TopicReader> {
209        TopicReader::new(
210            consumer,
211            topic.into(),
212            self.connection_manager.clone(),
213            self.token_cache.clone(),
214        )
215        .await
216    }
217
218    pub async fn create_writer_with_params(
219        &mut self,
220        writer_options: TopicWriterOptions,
221    ) -> YdbResult<TopicWriter> {
222        TopicWriter::new(writer_options, self.connection_manager.clone()).await
223    }
224
225    pub async fn create_writer(&mut self, path: String) -> YdbResult<TopicWriter> {
226        TopicWriter::new(
227            TopicWriterOptionsBuilder::default()
228                .topic_path(path)
229                .build()
230                .unwrap(),
231            self.connection_manager.clone(),
232        )
233        .await
234    }
235
236    pub(crate) async fn raw_client_connection(
237        &self,
238    ) -> YdbResult<grpc_wrapper::raw_topic_service::client::RawTopicClient> {
239        self.connection_manager
240            .get_auth_service(grpc_wrapper::raw_topic_service::client::RawTopicClient::new)
241            .await
242    }
243}