crabka_client_admin/
log_dirs.rs1use std::collections::BTreeMap;
9
10use crabka_protocol::owned::{
11 alter_replica_log_dirs_request::{
12 AlterReplicaLogDir, AlterReplicaLogDirTopic, AlterReplicaLogDirsRequest,
13 },
14 describe_log_dirs_request::{DescribableLogDirTopic, DescribeLogDirsRequest},
15};
16
17use crate::{AdminClient, AdminError, KafkaError, kafka_error_name};
18
19#[derive(Debug, Clone)]
21pub struct AlterReplicaLogDirOutcome {
22 pub topic: String,
23 pub partition: i32,
24 pub error: Option<KafkaError>,
25}
26
27#[derive(Debug, Clone)]
29pub struct LogDirInfo {
30 pub log_dir: String,
31 pub error: Option<KafkaError>,
32 pub topics: Vec<LogDirTopicInfo>,
33}
34
35#[derive(Debug, Clone)]
36pub struct LogDirTopicInfo {
37 pub name: String,
38 pub partitions: Vec<LogDirPartitionInfo>,
39}
40
41#[derive(Debug, Clone)]
42pub struct LogDirPartitionInfo {
43 pub partition_index: i32,
44 pub partition_size: i64,
45 pub offset_lag: i64,
46 pub is_future_key: bool,
47}
48
49impl AdminClient {
50 pub async fn alter_replica_log_dirs(
56 &mut self,
57 assignments: &BTreeMap<String, Vec<(String, Vec<i32>)>>,
58 ) -> Result<Vec<AlterReplicaLogDirOutcome>, AdminError> {
59 let dirs = assignments
60 .iter()
61 .map(|(path, topics)| AlterReplicaLogDir {
62 path: path.clone(),
63 topics: topics
64 .iter()
65 .map(|(name, partitions)| AlterReplicaLogDirTopic {
66 name: name.clone(),
67 partitions: partitions.clone(),
68 ..Default::default()
69 })
70 .collect(),
71 ..Default::default()
72 })
73 .collect();
74 let req = AlterReplicaLogDirsRequest {
75 dirs,
76 ..Default::default()
77 };
78 let resp = self.conn.send(req).await?;
79
80 let mut out = Vec::new();
81 for topic in resp.results {
82 for p in topic.partitions {
83 let error = if p.error_code == 0 {
84 None
85 } else {
86 Some(KafkaError {
87 code: p.error_code,
88 name: kafka_error_name(p.error_code),
89 message: None,
90 })
91 };
92 out.push(AlterReplicaLogDirOutcome {
93 topic: topic.topic_name.clone(),
94 partition: p.partition_index,
95 error,
96 });
97 }
98 }
99 Ok(out)
100 }
101
102 pub async fn describe_log_dirs(
108 &mut self,
109 filter: Option<&BTreeMap<String, Vec<i32>>>,
110 ) -> Result<Vec<LogDirInfo>, AdminError> {
111 let topics = filter.map(|f| {
112 f.iter()
113 .map(|(name, partitions)| DescribableLogDirTopic {
114 topic: name.clone(),
115 partitions: partitions.clone(),
116 ..Default::default()
117 })
118 .collect()
119 });
120 let req = DescribeLogDirsRequest {
121 topics,
122 ..Default::default()
123 };
124 let resp = self.conn.send(req).await?;
125
126 let mut out = Vec::new();
127 for result in resp.results {
128 let error = if result.error_code == 0 {
129 None
130 } else {
131 Some(KafkaError {
132 code: result.error_code,
133 name: kafka_error_name(result.error_code),
134 message: None,
135 })
136 };
137 let topics = result
138 .topics
139 .into_iter()
140 .map(|t| LogDirTopicInfo {
141 name: t.name,
142 partitions: t
143 .partitions
144 .into_iter()
145 .map(|p| LogDirPartitionInfo {
146 partition_index: p.partition_index,
147 partition_size: p.partition_size,
148 offset_lag: p.offset_lag,
149 is_future_key: p.is_future_key,
150 })
151 .collect(),
152 })
153 .collect();
154 out.push(LogDirInfo {
155 log_dir: result.log_dir,
156 error,
157 topics,
158 });
159 }
160 Ok(out)
161 }
162}