Skip to main content

ydb/client_topic/
list_types.rs

1use crate::grpc_wrapper::raw_topic_service::common::codecs::{RawCodec, RawSupportedCodecs};
2use crate::grpc_wrapper::raw_topic_service::common::consumer::{RawAlterConsumer, RawConsumer};
3use crate::grpc_wrapper::raw_topic_service::common::metering_mode::RawMeteringMode;
4use crate::grpc_wrapper::raw_topic_service::common::partition::{
5    RawPartitionInfo, RawPartitionLocation, RawPartitionStats,
6};
7use crate::grpc_wrapper::raw_topic_service::common::partitioning_settings::RawPartitioningSettings;
8use crate::grpc_wrapper::raw_topic_service::common::topic::RawTopicStats;
9use crate::grpc_wrapper::raw_topic_service::describe_topic::RawDescribeTopicResult;
10use derive_builder::Builder;
11use std::collections::HashMap;
12use std::option::Option;
13use std::time::SystemTime;
14
15#[derive(Debug, Clone, Default, PartialEq, Eq)]
16pub struct Codec {
17    pub code: i32,
18}
19
20impl Codec {
21    pub const RAW: Codec = Codec { code: 1 };
22    pub const GZIP: Codec = Codec { code: 2 };
23    pub const LZOP: Codec = Codec { code: 3 };
24    pub const ZSTD: Codec = Codec { code: 4 };
25
26    pub fn is_custom(&self) -> bool {
27        self.code >= 10000 && self.code < 20000
28    }
29}
30
31impl From<RawCodec> for Codec {
32    fn from(value: RawCodec) -> Self {
33        Self { code: value.code }
34    }
35}
36
37impl From<Codec> for RawCodec {
38    fn from(value: Codec) -> Self {
39        Self { code: value.code }
40    }
41}
42
43impl From<RawSupportedCodecs> for Vec<Codec> {
44    fn from(value: RawSupportedCodecs) -> Vec<Codec> {
45        value.codecs.into_iter().map(Codec::from).collect()
46    }
47}
48
49impl From<Vec<Codec>> for RawSupportedCodecs {
50    fn from(value: Vec<Codec>) -> RawSupportedCodecs {
51        Self {
52            codecs: value.into_iter().map(RawCodec::from).collect(),
53        }
54    }
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub enum MeteringMode {
59    ReservedCapacity,
60    RequestUnits,
61}
62
63impl From<RawMeteringMode> for Option<MeteringMode> {
64    fn from(value: RawMeteringMode) -> Self {
65        match value {
66            RawMeteringMode::Unspecified => None,
67            RawMeteringMode::ReservedCapacity => Some(MeteringMode::ReservedCapacity),
68            RawMeteringMode::RequestUnits => Some(MeteringMode::RequestUnits),
69        }
70    }
71}
72
73impl From<Option<MeteringMode>> for RawMeteringMode {
74    fn from(value: Option<MeteringMode>) -> Self {
75        match value {
76            None => RawMeteringMode::Unspecified,
77            Some(MeteringMode::RequestUnits) => RawMeteringMode::RequestUnits,
78            Some(MeteringMode::ReservedCapacity) => RawMeteringMode::ReservedCapacity,
79        }
80    }
81}
82
83#[derive(Debug, Clone, Builder)]
84#[builder(build_fn(error = "crate::errors::YdbError"))]
85pub struct Consumer {
86    pub name: String,
87
88    #[builder(default)]
89    pub important: bool,
90
91    #[builder(default)]
92    pub read_from: Option<SystemTime>,
93
94    #[builder(default)]
95    pub supported_codecs: Vec<Codec>,
96
97    #[builder(default)]
98    pub attributes: HashMap<String, String>,
99
100    #[builder(default)]
101    pub consumer_stats: Option<ConsumerStats>,
102}
103
104impl From<RawConsumer> for Consumer {
105    fn from(consumer: RawConsumer) -> Self {
106        Self {
107            name: consumer.name,
108            important: consumer.important,
109            read_from: consumer.read_from.map(|x| x.into()),
110            supported_codecs: consumer.supported_codecs.into(),
111            attributes: consumer.attributes,
112            consumer_stats: None,
113        }
114    }
115}
116
117impl From<Consumer> for RawConsumer {
118    fn from(consumer: Consumer) -> Self {
119        Self {
120            name: consumer.name,
121            important: consumer.important,
122            read_from: consumer.read_from.map(|x| x.into()),
123            supported_codecs: consumer.supported_codecs.into(),
124            attributes: consumer.attributes,
125            consumer_stats: None,
126        }
127    }
128}
129
130#[derive(Debug, Clone, Builder)]
131#[builder(build_fn(error = "crate::errors::YdbError"))]
132pub struct AlterConsumer {
133    pub name: String,
134
135    #[builder(default)]
136    pub set_important: Option<bool>,
137
138    #[builder(default)]
139    pub set_read_from: Option<SystemTime>,
140
141    #[builder(default)]
142    pub set_supported_codecs: Option<Vec<Codec>>,
143
144    #[builder(default)]
145    pub alter_attributes: HashMap<String, String>,
146}
147
148impl From<AlterConsumer> for RawAlterConsumer {
149    fn from(consumer: AlterConsumer) -> Self {
150        Self {
151            name: consumer.name,
152            set_important: consumer.set_important,
153            set_read_from: consumer.set_read_from.map(|x| x.into()),
154            set_supported_codecs: consumer.set_supported_codecs.map(|x| x.into()),
155            alter_attributes: consumer.alter_attributes,
156        }
157    }
158}
159
160#[derive(Debug, Clone, Default)]
161pub struct PartitioningSettings {
162    pub min_active_partitions: i64,
163    pub partition_count_limit: i64,
164}
165
166impl From<RawPartitioningSettings> for PartitioningSettings {
167    fn from(value: RawPartitioningSettings) -> Self {
168        Self {
169            min_active_partitions: value.min_active_partitions,
170            partition_count_limit: value.partition_count_limit,
171        }
172    }
173}
174
175impl From<PartitioningSettings> for RawPartitioningSettings {
176    fn from(value: PartitioningSettings) -> Self {
177        Self {
178            min_active_partitions: value.min_active_partitions,
179            partition_count_limit: value.partition_count_limit,
180        }
181    }
182}
183
184#[derive(Debug, Clone)]
185pub struct PartitionStats {
186    pub start_offset: i64,
187    pub end_offset: i64,
188    pub store_size_bytes: i64,
189    pub last_write_time: SystemTime,
190    pub max_write_time_lag: std::time::Duration,
191    pub bytes_written_per_minute: i64,
192    pub bytes_written_per_hour: i64,
193    pub bytes_written_per_day: i64,
194}
195
196impl Default for PartitionStats {
197    fn default() -> Self {
198        Self {
199            start_offset: 0,
200            end_offset: 0,
201            store_size_bytes: 0,
202            last_write_time: SystemTime::UNIX_EPOCH,
203            max_write_time_lag: std::time::Duration::from_secs(0),
204            bytes_written_per_minute: 0,
205            bytes_written_per_hour: 0,
206            bytes_written_per_day: 0,
207        }
208    }
209}
210
211impl From<RawPartitionStats> for PartitionStats {
212    fn from(value: RawPartitionStats) -> Self {
213        Self {
214            start_offset: value.partition_offsets.start,
215            end_offset: value.partition_offsets.end,
216            store_size_bytes: value.store_size_bytes,
217            last_write_time: value.last_write_time.into(),
218            max_write_time_lag: value.max_write_time_lag.into(),
219            bytes_written_per_minute: value.bytes_written.per_minute,
220            bytes_written_per_hour: value.bytes_written.per_hour,
221            bytes_written_per_day: value.bytes_written.per_day,
222        }
223    }
224}
225
226#[derive(Debug, Clone, Default)]
227pub struct PartitionLocation {
228    pub node_id: i32,
229    pub generation: i64,
230}
231
232impl From<RawPartitionLocation> for PartitionLocation {
233    fn from(value: RawPartitionLocation) -> Self {
234        Self {
235            node_id: value.node_id,
236            generation: value.generation,
237        }
238    }
239}
240
241/// PartitionInfo contains info about partition.
242#[derive(Debug, Clone)]
243pub struct PartitionInfo {
244    pub partition_id: i64,
245    pub active: bool,
246    pub child_partition_ids: Vec<i64>,
247    pub parent_partition_ids: Vec<i64>,
248    pub stats: Option<PartitionStats>,
249    pub location: Option<PartitionLocation>,
250}
251
252impl From<RawPartitionInfo> for PartitionInfo {
253    fn from(value: RawPartitionInfo) -> Self {
254        Self {
255            partition_id: value.partition_id,
256            active: value.active,
257            child_partition_ids: value.child_partition_ids,
258            parent_partition_ids: value.parent_partition_ids,
259            stats: value.partition_stats.map(|x| x.into()),
260            location: value.partition_location.map(|x| x.into()),
261        }
262    }
263}
264
265#[derive(Debug, Clone)]
266pub struct TopicStats {
267    pub store_size_bytes: i64,
268    pub min_last_write_time: SystemTime,
269    pub max_write_time_lag: std::time::Duration,
270    pub bytes_written_per_minute: i64,
271    pub bytes_written_per_hour: i64,
272    pub bytes_written_per_day: i64,
273}
274
275impl From<RawTopicStats> for TopicStats {
276    fn from(value: RawTopicStats) -> Self {
277        Self {
278            store_size_bytes: value.store_size_bytes,
279            min_last_write_time: value.min_last_write_time.into(),
280            max_write_time_lag: value.max_write_time_lag.into(),
281            bytes_written_per_minute: value.bytes_written.per_minute,
282            bytes_written_per_hour: value.bytes_written.per_hour,
283            bytes_written_per_day: value.bytes_written.per_day,
284        }
285    }
286}
287
288/// TopicDescription contains info about topic.
289#[derive(Debug, Clone)]
290pub struct TopicDescription {
291    pub path: String,
292    pub partitioning_settings: PartitioningSettings,
293    pub partitions: Vec<PartitionInfo>,
294    pub retention_period: std::time::Duration,
295    pub retention_storage_mb: Option<i64>,
296    pub supported_codecs: Vec<Codec>,
297    pub partition_write_speed_bytes_per_second: i64,
298    pub partition_total_read_speed_bytes_per_second: i64,
299    pub partition_consumer_read_speed_bytes_per_second: i64,
300    pub partition_write_burst_bytes: i64,
301    pub attributes: HashMap<String, String>,
302    pub consumers: Vec<Consumer>,
303    pub metering_mode: Option<MeteringMode>,
304    pub stats: Option<TopicStats>,
305}
306
307impl From<RawDescribeTopicResult> for TopicDescription {
308    fn from(value: RawDescribeTopicResult) -> Self {
309        let retention_storage_mb = if value.retention_storage_mb > 0 {
310            Some(value.retention_storage_mb)
311        } else {
312            None
313        };
314
315        Self {
316            path: value.self_.name,
317            partitioning_settings: value.partitioning_settings.into(),
318            partitions: value.partitions.into_iter().map(|x| x.into()).collect(),
319            retention_period: value.retention_period.into(),
320            retention_storage_mb,
321            supported_codecs: value
322                .supported_codecs
323                .codecs
324                .into_iter()
325                .map(|x| x.into())
326                .collect(),
327            partition_write_speed_bytes_per_second: value.partition_write_speed_bytes_per_second,
328            partition_total_read_speed_bytes_per_second: value
329                .partition_total_read_speed_bytes_per_second,
330            partition_consumer_read_speed_bytes_per_second: value
331                .partition_consumer_read_speed_bytes_per_second,
332            partition_write_burst_bytes: value.partition_write_burst_bytes,
333            attributes: value.attributes,
334            consumers: value.consumers.into_iter().map(|x| x.into()).collect(),
335            metering_mode: value.metering_mode.into(),
336            stats: value.topic_stats.map(|x| x.into()),
337        }
338    }
339}
340
341#[derive(Debug, Clone)]
342pub struct ConsumerStats {
343    pub min_partitions_last_read_time: std::time::SystemTime,
344    pub max_read_time_lag: std::time::Duration,
345    pub max_write_time_lag: std::time::Duration,
346    pub max_committed_time_lag: std::time::Duration,
347    pub bytes_read_per_minute: i64,
348    pub bytes_read_per_hour: i64,
349    pub bytes_read_per_day: i64,
350}
351
352#[derive(Debug, Clone)]
353pub struct PartitionConsumerStats {
354    pub committed_offset: i64,
355    pub last_read_time: std::time::SystemTime,
356    pub max_read_time_lag: std::time::Duration,
357    pub max_write_time_lag: std::time::Duration,
358    pub max_committed_time_lag: std::time::Duration,
359    pub bytes_read_per_minute: i64,
360    pub bytes_read_per_hour: i64,
361    pub bytes_read_per_day: i64,
362}
363
364impl Default for PartitionConsumerStats {
365    fn default() -> Self {
366        Self {
367            committed_offset: 0,
368            last_read_time: std::time::SystemTime::UNIX_EPOCH,
369            max_read_time_lag: std::time::Duration::from_secs(0),
370            max_write_time_lag: std::time::Duration::from_secs(0),
371            max_committed_time_lag: std::time::Duration::from_secs(0),
372            bytes_read_per_minute: 0,
373            bytes_read_per_hour: 0,
374            bytes_read_per_day: 0,
375        }
376    }
377}
378
379#[derive(Debug, Clone)]
380pub struct ConsumerPartitionInfo {
381    pub partition_id: i64,
382    pub active: bool,
383    pub child_partition_ids: Vec<i64>,
384    pub parent_partition_ids: Vec<i64>,
385    pub stats: PartitionStats,
386    pub consumer_stats: PartitionConsumerStats,
387    pub location: PartitionLocation,
388}
389
390#[derive(Debug, Clone)]
391pub struct ConsumerDescription {
392    pub path: String,
393    pub consumer: Consumer,
394    pub consumer_stats: ConsumerStats,
395    pub partitions: Vec<ConsumerPartitionInfo>,
396}
397
398impl From<crate::grpc_wrapper::raw_topic_service::describe_consumer::RawDescribeConsumerResult>
399    for ConsumerDescription
400{
401    fn from(
402        value: crate::grpc_wrapper::raw_topic_service::describe_consumer::RawDescribeConsumerResult,
403    ) -> Self {
404        let consumer_stats = value
405            .consumer
406            .consumer_stats
407            .as_ref()
408            .map(|stats| ConsumerStats {
409                min_partitions_last_read_time: stats
410                    .min_partitions_last_read_time
411                    .clone()
412                    .map(|x| x.into())
413                    .unwrap_or_else(|| std::time::SystemTime::UNIX_EPOCH),
414                max_read_time_lag: stats
415                    .max_read_time_lag
416                    .clone()
417                    .map(|x| x.into())
418                    .unwrap_or_default(),
419                max_write_time_lag: stats
420                    .max_write_time_lag
421                    .clone()
422                    .map(|x| x.into())
423                    .unwrap_or_default(),
424                max_committed_time_lag: stats
425                    .max_committed_time_lag
426                    .clone()
427                    .map(|x| x.into())
428                    .unwrap_or_default(),
429                bytes_read_per_minute: stats
430                    .bytes_read
431                    .as_ref()
432                    .map(|b| b.per_minute)
433                    .unwrap_or_default(),
434                bytes_read_per_hour: stats
435                    .bytes_read
436                    .as_ref()
437                    .map(|b| b.per_hour)
438                    .unwrap_or_default(),
439                bytes_read_per_day: stats
440                    .bytes_read
441                    .as_ref()
442                    .map(|b| b.per_day)
443                    .unwrap_or_default(),
444            })
445            .unwrap_or_else(|| ConsumerStats {
446                min_partitions_last_read_time: std::time::SystemTime::UNIX_EPOCH,
447                max_read_time_lag: std::time::Duration::from_secs(0),
448                max_write_time_lag: std::time::Duration::from_secs(0),
449                max_committed_time_lag: std::time::Duration::from_secs(0),
450                bytes_read_per_minute: 0,
451                bytes_read_per_hour: 0,
452                bytes_read_per_day: 0,
453            });
454        let consumer: Consumer = value.consumer.into();
455
456        let partitions = value
457            .partitions
458            .into_iter()
459            .map(|p| {
460                let partition_info: RawPartitionInfo = p;
461                ConsumerPartitionInfo {
462                    partition_id: partition_info.partition_id,
463                    active: partition_info.active,
464                    child_partition_ids: partition_info.child_partition_ids,
465                    parent_partition_ids: partition_info.parent_partition_ids,
466                    stats: partition_info
467                        .partition_stats
468                        .map(|x| x.into())
469                        .unwrap_or_default(),
470                    consumer_stats: partition_info
471                        .partition_consumer_stats
472                        .map(|stats| PartitionConsumerStats {
473                            committed_offset: stats.committed_offset,
474                            last_read_time: stats
475                                .last_read_time
476                                .map(|x| x.into())
477                                .unwrap_or_else(|| std::time::SystemTime::UNIX_EPOCH),
478                            max_read_time_lag: stats
479                                .max_read_time_lag
480                                .map(|x| x.into())
481                                .unwrap_or_default(),
482                            max_write_time_lag: stats
483                                .max_write_time_lag
484                                .map(|x| x.into())
485                                .unwrap_or_default(),
486                            max_committed_time_lag: std::time::Duration::from_secs(0),
487                            bytes_read_per_minute: stats
488                                .bytes_read
489                                .as_ref()
490                                .map(|b| b.per_minute)
491                                .unwrap_or_default(),
492                            bytes_read_per_hour: stats
493                                .bytes_read
494                                .as_ref()
495                                .map(|b| b.per_hour)
496                                .unwrap_or_default(),
497                            bytes_read_per_day: stats
498                                .bytes_read
499                                .as_ref()
500                                .map(|b| b.per_day)
501                                .unwrap_or_default(),
502                        })
503                        .unwrap_or_default(),
504                    location: partition_info
505                        .partition_location
506                        .map(|x| x.into())
507                        .unwrap_or_default(),
508                }
509            })
510            .collect();
511
512        Self {
513            path: value.self_.name,
514            consumer,
515            consumer_stats,
516            partitions,
517        }
518    }
519}