Skip to main content

crabka_client_admin/
log_dirs.rs

1//! KIP-113 admin RPCs: `AlterReplicaLogDirs` (`api_key` 34) and
2//! `DescribeLogDirs` (`api_key` 35).
3//!
4//! Both target the broker the connection is open against — these are
5//! per-broker calls, so the admin client does NOT do a controller
6//! retry on `NOT_CONTROLLER` (the request doesn't hit the controller).
7
8use std::collections::BTreeMap;
9
10use crabka_protocol::owned::{
11    alter_replica_log_dirs_request::{
12        AlterReplicaLogDir, AlterReplicaLogDirTopic, AlterReplicaLogDirsRequest,
13    },
14    describe_log_dirs_request::{DescribableLogDirTopic, DescribeLogDirsRequest},
15};
16
17use crate::{AdminClient, AdminError, KafkaError, kafka_error_name};
18
19/// One row of an `AlterReplicaLogDirs` result.
20#[derive(Debug, Clone)]
21pub struct AlterReplicaLogDirOutcome {
22    pub topic: String,
23    pub partition: i32,
24    pub error: Option<KafkaError>,
25}
26
27/// One log dir from a `DescribeLogDirs` response.
28#[derive(Debug, Clone)]
29pub struct LogDirInfo {
30    pub log_dir: String,
31    pub error: Option<KafkaError>,
32    pub topics: Vec<LogDirTopicInfo>,
33}
34
35#[derive(Debug, Clone)]
36pub struct LogDirTopicInfo {
37    pub name: String,
38    pub partitions: Vec<LogDirPartitionInfo>,
39}
40
41#[derive(Debug, Clone)]
42pub struct LogDirPartitionInfo {
43    pub partition_index: i32,
44    pub partition_size: i64,
45    pub offset_lag: i64,
46    pub is_future_key: bool,
47}
48
49impl AdminClient {
50    /// `AlterReplicaLogDirs` (KIP-113): move replicas between local
51    /// `log.dirs` on this broker.
52    ///
53    /// `assignments` maps each target absolute directory path to the
54    /// `(topic, [partition])` pairs that should be moved into it.
55    pub async fn alter_replica_log_dirs(
56        &mut self,
57        assignments: &BTreeMap<String, Vec<(String, Vec<i32>)>>,
58    ) -> Result<Vec<AlterReplicaLogDirOutcome>, AdminError> {
59        let dirs = assignments
60            .iter()
61            .map(|(path, topics)| AlterReplicaLogDir {
62                path: path.clone(),
63                topics: topics
64                    .iter()
65                    .map(|(name, partitions)| AlterReplicaLogDirTopic {
66                        name: name.clone(),
67                        partitions: partitions.clone(),
68                        ..Default::default()
69                    })
70                    .collect(),
71                ..Default::default()
72            })
73            .collect();
74        let req = AlterReplicaLogDirsRequest {
75            dirs,
76            ..Default::default()
77        };
78        let resp = self.conn.send(req).await?;
79
80        let mut out = Vec::new();
81        for topic in resp.results {
82            for p in topic.partitions {
83                let error = if p.error_code == 0 {
84                    None
85                } else {
86                    Some(KafkaError {
87                        code: p.error_code,
88                        name: kafka_error_name(p.error_code),
89                        message: None,
90                    })
91                };
92                out.push(AlterReplicaLogDirOutcome {
93                    topic: topic.topic_name.clone(),
94                    partition: p.partition_index,
95                    error,
96                });
97            }
98        }
99        Ok(out)
100    }
101
102    /// `DescribeLogDirs` (KIP-113): list every configured `log.dir` on
103    /// this broker, with the partitions each holds (current and
104    /// in-progress future logs). Pass `None` to fetch all partitions
105    /// or `Some` with topic → partitions filter (empty inner vec means
106    /// all partitions of that topic).
107    pub async fn describe_log_dirs(
108        &mut self,
109        filter: Option<&BTreeMap<String, Vec<i32>>>,
110    ) -> Result<Vec<LogDirInfo>, AdminError> {
111        let topics = filter.map(|f| {
112            f.iter()
113                .map(|(name, partitions)| DescribableLogDirTopic {
114                    topic: name.clone(),
115                    partitions: partitions.clone(),
116                    ..Default::default()
117                })
118                .collect()
119        });
120        let req = DescribeLogDirsRequest {
121            topics,
122            ..Default::default()
123        };
124        let resp = self.conn.send(req).await?;
125
126        let mut out = Vec::new();
127        for result in resp.results {
128            let error = if result.error_code == 0 {
129                None
130            } else {
131                Some(KafkaError {
132                    code: result.error_code,
133                    name: kafka_error_name(result.error_code),
134                    message: None,
135                })
136            };
137            let topics = result
138                .topics
139                .into_iter()
140                .map(|t| LogDirTopicInfo {
141                    name: t.name,
142                    partitions: t
143                        .partitions
144                        .into_iter()
145                        .map(|p| LogDirPartitionInfo {
146                            partition_index: p.partition_index,
147                            partition_size: p.partition_size,
148                            offset_lag: p.offset_lag,
149                            is_future_key: p.is_future_key,
150                        })
151                        .collect(),
152                })
153                .collect();
154            out.push(LogDirInfo {
155                log_dir: result.log_dir,
156                error,
157                topics,
158            });
159        }
160        Ok(out)
161    }
162}