1use crate::grpc_wrapper::raw_topic_service::common::codecs::{RawCodec, RawSupportedCodecs};
2use crate::grpc_wrapper::raw_topic_service::common::consumer::{RawAlterConsumer, RawConsumer};
3use crate::grpc_wrapper::raw_topic_service::common::metering_mode::RawMeteringMode;
4use crate::grpc_wrapper::raw_topic_service::common::partition::{
5 RawPartitionInfo, RawPartitionLocation, RawPartitionStats,
6};
7use crate::grpc_wrapper::raw_topic_service::common::partitioning_settings::RawPartitioningSettings;
8use crate::grpc_wrapper::raw_topic_service::common::topic::RawTopicStats;
9use crate::grpc_wrapper::raw_topic_service::describe_topic::RawDescribeTopicResult;
10use derive_builder::Builder;
11use std::collections::HashMap;
12use std::option::Option;
13use std::time::SystemTime;
14
15#[derive(Debug, Clone, Default, PartialEq, Eq)]
16pub struct Codec {
17 pub code: i32,
18}
19
20impl Codec {
21 pub const RAW: Codec = Codec { code: 1 };
22 pub const GZIP: Codec = Codec { code: 2 };
23 pub const LZOP: Codec = Codec { code: 3 };
24 pub const ZSTD: Codec = Codec { code: 4 };
25
26 pub fn is_custom(&self) -> bool {
27 self.code >= 10000 && self.code < 20000
28 }
29}
30
31impl From<RawCodec> for Codec {
32 fn from(value: RawCodec) -> Self {
33 Self { code: value.code }
34 }
35}
36
37impl From<Codec> for RawCodec {
38 fn from(value: Codec) -> Self {
39 Self { code: value.code }
40 }
41}
42
43impl From<RawSupportedCodecs> for Vec<Codec> {
44 fn from(value: RawSupportedCodecs) -> Vec<Codec> {
45 value.codecs.into_iter().map(Codec::from).collect()
46 }
47}
48
49impl From<Vec<Codec>> for RawSupportedCodecs {
50 fn from(value: Vec<Codec>) -> RawSupportedCodecs {
51 Self {
52 codecs: value.into_iter().map(RawCodec::from).collect(),
53 }
54 }
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub enum MeteringMode {
59 ReservedCapacity,
60 RequestUnits,
61}
62
63impl From<RawMeteringMode> for Option<MeteringMode> {
64 fn from(value: RawMeteringMode) -> Self {
65 match value {
66 RawMeteringMode::Unspecified => None,
67 RawMeteringMode::ReservedCapacity => Some(MeteringMode::ReservedCapacity),
68 RawMeteringMode::RequestUnits => Some(MeteringMode::RequestUnits),
69 }
70 }
71}
72
73impl From<Option<MeteringMode>> for RawMeteringMode {
74 fn from(value: Option<MeteringMode>) -> Self {
75 match value {
76 None => RawMeteringMode::Unspecified,
77 Some(MeteringMode::RequestUnits) => RawMeteringMode::RequestUnits,
78 Some(MeteringMode::ReservedCapacity) => RawMeteringMode::ReservedCapacity,
79 }
80 }
81}
82
83#[derive(Debug, Clone, Builder)]
84#[builder(build_fn(error = "crate::errors::YdbError"))]
85pub struct Consumer {
86 pub name: String,
87
88 #[builder(default)]
89 pub important: bool,
90
91 #[builder(default)]
92 pub read_from: Option<SystemTime>,
93
94 #[builder(default)]
95 pub supported_codecs: Vec<Codec>,
96
97 #[builder(default)]
98 pub attributes: HashMap<String, String>,
99
100 #[builder(default)]
101 pub consumer_stats: Option<ConsumerStats>,
102}
103
104impl From<RawConsumer> for Consumer {
105 fn from(consumer: RawConsumer) -> Self {
106 Self {
107 name: consumer.name,
108 important: consumer.important,
109 read_from: consumer.read_from.map(|x| x.into()),
110 supported_codecs: consumer.supported_codecs.into(),
111 attributes: consumer.attributes,
112 consumer_stats: None,
113 }
114 }
115}
116
117impl From<Consumer> for RawConsumer {
118 fn from(consumer: Consumer) -> Self {
119 Self {
120 name: consumer.name,
121 important: consumer.important,
122 read_from: consumer.read_from.map(|x| x.into()),
123 supported_codecs: consumer.supported_codecs.into(),
124 attributes: consumer.attributes,
125 consumer_stats: None,
126 }
127 }
128}
129
130#[derive(Debug, Clone, Builder)]
131#[builder(build_fn(error = "crate::errors::YdbError"))]
132pub struct AlterConsumer {
133 pub name: String,
134
135 #[builder(default)]
136 pub set_important: Option<bool>,
137
138 #[builder(default)]
139 pub set_read_from: Option<SystemTime>,
140
141 #[builder(default)]
142 pub set_supported_codecs: Option<Vec<Codec>>,
143
144 #[builder(default)]
145 pub alter_attributes: HashMap<String, String>,
146}
147
148impl From<AlterConsumer> for RawAlterConsumer {
149 fn from(consumer: AlterConsumer) -> Self {
150 Self {
151 name: consumer.name,
152 set_important: consumer.set_important,
153 set_read_from: consumer.set_read_from.map(|x| x.into()),
154 set_supported_codecs: consumer.set_supported_codecs.map(|x| x.into()),
155 alter_attributes: consumer.alter_attributes,
156 }
157 }
158}
159
160#[derive(Debug, Clone, Default)]
161pub struct PartitioningSettings {
162 pub min_active_partitions: i64,
163 pub partition_count_limit: i64,
164}
165
166impl From<RawPartitioningSettings> for PartitioningSettings {
167 fn from(value: RawPartitioningSettings) -> Self {
168 Self {
169 min_active_partitions: value.min_active_partitions,
170 partition_count_limit: value.partition_count_limit,
171 }
172 }
173}
174
175impl From<PartitioningSettings> for RawPartitioningSettings {
176 fn from(value: PartitioningSettings) -> Self {
177 Self {
178 min_active_partitions: value.min_active_partitions,
179 partition_count_limit: value.partition_count_limit,
180 }
181 }
182}
183
184#[derive(Debug, Clone)]
185pub struct PartitionStats {
186 pub start_offset: i64,
187 pub end_offset: i64,
188 pub store_size_bytes: i64,
189 pub last_write_time: SystemTime,
190 pub max_write_time_lag: std::time::Duration,
191 pub bytes_written_per_minute: i64,
192 pub bytes_written_per_hour: i64,
193 pub bytes_written_per_day: i64,
194}
195
196impl Default for PartitionStats {
197 fn default() -> Self {
198 Self {
199 start_offset: 0,
200 end_offset: 0,
201 store_size_bytes: 0,
202 last_write_time: SystemTime::UNIX_EPOCH,
203 max_write_time_lag: std::time::Duration::from_secs(0),
204 bytes_written_per_minute: 0,
205 bytes_written_per_hour: 0,
206 bytes_written_per_day: 0,
207 }
208 }
209}
210
211impl From<RawPartitionStats> for PartitionStats {
212 fn from(value: RawPartitionStats) -> Self {
213 Self {
214 start_offset: value.partition_offsets.start,
215 end_offset: value.partition_offsets.end,
216 store_size_bytes: value.store_size_bytes,
217 last_write_time: value.last_write_time.into(),
218 max_write_time_lag: value.max_write_time_lag.into(),
219 bytes_written_per_minute: value.bytes_written.per_minute,
220 bytes_written_per_hour: value.bytes_written.per_hour,
221 bytes_written_per_day: value.bytes_written.per_day,
222 }
223 }
224}
225
226#[derive(Debug, Clone, Default)]
227pub struct PartitionLocation {
228 pub node_id: i32,
229 pub generation: i64,
230}
231
232impl From<RawPartitionLocation> for PartitionLocation {
233 fn from(value: RawPartitionLocation) -> Self {
234 Self {
235 node_id: value.node_id,
236 generation: value.generation,
237 }
238 }
239}
240
241#[derive(Debug, Clone)]
243pub struct PartitionInfo {
244 pub partition_id: i64,
245 pub active: bool,
246 pub child_partition_ids: Vec<i64>,
247 pub parent_partition_ids: Vec<i64>,
248 pub stats: Option<PartitionStats>,
249 pub location: Option<PartitionLocation>,
250}
251
252impl From<RawPartitionInfo> for PartitionInfo {
253 fn from(value: RawPartitionInfo) -> Self {
254 Self {
255 partition_id: value.partition_id,
256 active: value.active,
257 child_partition_ids: value.child_partition_ids,
258 parent_partition_ids: value.parent_partition_ids,
259 stats: value.partition_stats.map(|x| x.into()),
260 location: value.partition_location.map(|x| x.into()),
261 }
262 }
263}
264
265#[derive(Debug, Clone)]
266pub struct TopicStats {
267 pub store_size_bytes: i64,
268 pub min_last_write_time: SystemTime,
269 pub max_write_time_lag: std::time::Duration,
270 pub bytes_written_per_minute: i64,
271 pub bytes_written_per_hour: i64,
272 pub bytes_written_per_day: i64,
273}
274
275impl From<RawTopicStats> for TopicStats {
276 fn from(value: RawTopicStats) -> Self {
277 Self {
278 store_size_bytes: value.store_size_bytes,
279 min_last_write_time: value.min_last_write_time.into(),
280 max_write_time_lag: value.max_write_time_lag.into(),
281 bytes_written_per_minute: value.bytes_written.per_minute,
282 bytes_written_per_hour: value.bytes_written.per_hour,
283 bytes_written_per_day: value.bytes_written.per_day,
284 }
285 }
286}
287
288#[derive(Debug, Clone)]
290pub struct TopicDescription {
291 pub path: String,
292 pub partitioning_settings: PartitioningSettings,
293 pub partitions: Vec<PartitionInfo>,
294 pub retention_period: std::time::Duration,
295 pub retention_storage_mb: Option<i64>,
296 pub supported_codecs: Vec<Codec>,
297 pub partition_write_speed_bytes_per_second: i64,
298 pub partition_total_read_speed_bytes_per_second: i64,
299 pub partition_consumer_read_speed_bytes_per_second: i64,
300 pub partition_write_burst_bytes: i64,
301 pub attributes: HashMap<String, String>,
302 pub consumers: Vec<Consumer>,
303 pub metering_mode: Option<MeteringMode>,
304 pub stats: Option<TopicStats>,
305}
306
307impl From<RawDescribeTopicResult> for TopicDescription {
308 fn from(value: RawDescribeTopicResult) -> Self {
309 let retention_storage_mb = if value.retention_storage_mb > 0 {
310 Some(value.retention_storage_mb)
311 } else {
312 None
313 };
314
315 Self {
316 path: value.self_.name,
317 partitioning_settings: value.partitioning_settings.into(),
318 partitions: value.partitions.into_iter().map(|x| x.into()).collect(),
319 retention_period: value.retention_period.into(),
320 retention_storage_mb,
321 supported_codecs: value
322 .supported_codecs
323 .codecs
324 .into_iter()
325 .map(|x| x.into())
326 .collect(),
327 partition_write_speed_bytes_per_second: value.partition_write_speed_bytes_per_second,
328 partition_total_read_speed_bytes_per_second: value
329 .partition_total_read_speed_bytes_per_second,
330 partition_consumer_read_speed_bytes_per_second: value
331 .partition_consumer_read_speed_bytes_per_second,
332 partition_write_burst_bytes: value.partition_write_burst_bytes,
333 attributes: value.attributes,
334 consumers: value.consumers.into_iter().map(|x| x.into()).collect(),
335 metering_mode: value.metering_mode.into(),
336 stats: value.topic_stats.map(|x| x.into()),
337 }
338 }
339}
340
341#[derive(Debug, Clone)]
342pub struct ConsumerStats {
343 pub min_partitions_last_read_time: std::time::SystemTime,
344 pub max_read_time_lag: std::time::Duration,
345 pub max_write_time_lag: std::time::Duration,
346 pub max_committed_time_lag: std::time::Duration,
347 pub bytes_read_per_minute: i64,
348 pub bytes_read_per_hour: i64,
349 pub bytes_read_per_day: i64,
350}
351
352#[derive(Debug, Clone)]
353pub struct PartitionConsumerStats {
354 pub committed_offset: i64,
355 pub last_read_time: std::time::SystemTime,
356 pub max_read_time_lag: std::time::Duration,
357 pub max_write_time_lag: std::time::Duration,
358 pub max_committed_time_lag: std::time::Duration,
359 pub bytes_read_per_minute: i64,
360 pub bytes_read_per_hour: i64,
361 pub bytes_read_per_day: i64,
362}
363
364impl Default for PartitionConsumerStats {
365 fn default() -> Self {
366 Self {
367 committed_offset: 0,
368 last_read_time: std::time::SystemTime::UNIX_EPOCH,
369 max_read_time_lag: std::time::Duration::from_secs(0),
370 max_write_time_lag: std::time::Duration::from_secs(0),
371 max_committed_time_lag: std::time::Duration::from_secs(0),
372 bytes_read_per_minute: 0,
373 bytes_read_per_hour: 0,
374 bytes_read_per_day: 0,
375 }
376 }
377}
378
379#[derive(Debug, Clone)]
380pub struct ConsumerPartitionInfo {
381 pub partition_id: i64,
382 pub active: bool,
383 pub child_partition_ids: Vec<i64>,
384 pub parent_partition_ids: Vec<i64>,
385 pub stats: PartitionStats,
386 pub consumer_stats: PartitionConsumerStats,
387 pub location: PartitionLocation,
388}
389
390#[derive(Debug, Clone)]
391pub struct ConsumerDescription {
392 pub path: String,
393 pub consumer: Consumer,
394 pub consumer_stats: ConsumerStats,
395 pub partitions: Vec<ConsumerPartitionInfo>,
396}
397
398impl From<crate::grpc_wrapper::raw_topic_service::describe_consumer::RawDescribeConsumerResult>
399 for ConsumerDescription
400{
401 fn from(
402 value: crate::grpc_wrapper::raw_topic_service::describe_consumer::RawDescribeConsumerResult,
403 ) -> Self {
404 let consumer_stats = value
405 .consumer
406 .consumer_stats
407 .as_ref()
408 .map(|stats| ConsumerStats {
409 min_partitions_last_read_time: stats
410 .min_partitions_last_read_time
411 .clone()
412 .map(|x| x.into())
413 .unwrap_or_else(|| std::time::SystemTime::UNIX_EPOCH),
414 max_read_time_lag: stats
415 .max_read_time_lag
416 .clone()
417 .map(|x| x.into())
418 .unwrap_or_default(),
419 max_write_time_lag: stats
420 .max_write_time_lag
421 .clone()
422 .map(|x| x.into())
423 .unwrap_or_default(),
424 max_committed_time_lag: stats
425 .max_committed_time_lag
426 .clone()
427 .map(|x| x.into())
428 .unwrap_or_default(),
429 bytes_read_per_minute: stats
430 .bytes_read
431 .as_ref()
432 .map(|b| b.per_minute)
433 .unwrap_or_default(),
434 bytes_read_per_hour: stats
435 .bytes_read
436 .as_ref()
437 .map(|b| b.per_hour)
438 .unwrap_or_default(),
439 bytes_read_per_day: stats
440 .bytes_read
441 .as_ref()
442 .map(|b| b.per_day)
443 .unwrap_or_default(),
444 })
445 .unwrap_or_else(|| ConsumerStats {
446 min_partitions_last_read_time: std::time::SystemTime::UNIX_EPOCH,
447 max_read_time_lag: std::time::Duration::from_secs(0),
448 max_write_time_lag: std::time::Duration::from_secs(0),
449 max_committed_time_lag: std::time::Duration::from_secs(0),
450 bytes_read_per_minute: 0,
451 bytes_read_per_hour: 0,
452 bytes_read_per_day: 0,
453 });
454 let consumer: Consumer = value.consumer.into();
455
456 let partitions = value
457 .partitions
458 .into_iter()
459 .map(|p| {
460 let partition_info: RawPartitionInfo = p;
461 ConsumerPartitionInfo {
462 partition_id: partition_info.partition_id,
463 active: partition_info.active,
464 child_partition_ids: partition_info.child_partition_ids,
465 parent_partition_ids: partition_info.parent_partition_ids,
466 stats: partition_info
467 .partition_stats
468 .map(|x| x.into())
469 .unwrap_or_default(),
470 consumer_stats: partition_info
471 .partition_consumer_stats
472 .map(|stats| PartitionConsumerStats {
473 committed_offset: stats.committed_offset,
474 last_read_time: stats
475 .last_read_time
476 .map(|x| x.into())
477 .unwrap_or_else(|| std::time::SystemTime::UNIX_EPOCH),
478 max_read_time_lag: stats
479 .max_read_time_lag
480 .map(|x| x.into())
481 .unwrap_or_default(),
482 max_write_time_lag: stats
483 .max_write_time_lag
484 .map(|x| x.into())
485 .unwrap_or_default(),
486 max_committed_time_lag: std::time::Duration::from_secs(0),
487 bytes_read_per_minute: stats
488 .bytes_read
489 .as_ref()
490 .map(|b| b.per_minute)
491 .unwrap_or_default(),
492 bytes_read_per_hour: stats
493 .bytes_read
494 .as_ref()
495 .map(|b| b.per_hour)
496 .unwrap_or_default(),
497 bytes_read_per_day: stats
498 .bytes_read
499 .as_ref()
500 .map(|b| b.per_day)
501 .unwrap_or_default(),
502 })
503 .unwrap_or_default(),
504 location: partition_info
505 .partition_location
506 .map(|x| x.into())
507 .unwrap_or_default(),
508 }
509 })
510 .collect();
511
512 Self {
513 path: value.self_.name,
514 consumer,
515 consumer_stats,
516 partitions,
517 }
518 }
519}