use std::collections::BTreeMap;
use crabka_protocol::owned::{
alter_replica_log_dirs_request::{
AlterReplicaLogDir, AlterReplicaLogDirTopic, AlterReplicaLogDirsRequest,
},
describe_log_dirs_request::{DescribableLogDirTopic, DescribeLogDirsRequest},
};
use crate::{AdminClient, AdminError, KafkaError, kafka_error_name};
#[derive(Debug, Clone)]
pub struct AlterReplicaLogDirOutcome {
pub topic: String,
pub partition: i32,
pub error: Option<KafkaError>,
}
#[derive(Debug, Clone)]
pub struct LogDirInfo {
pub log_dir: String,
pub error: Option<KafkaError>,
pub topics: Vec<LogDirTopicInfo>,
}
#[derive(Debug, Clone)]
pub struct LogDirTopicInfo {
pub name: String,
pub partitions: Vec<LogDirPartitionInfo>,
}
#[derive(Debug, Clone)]
pub struct LogDirPartitionInfo {
pub partition_index: i32,
pub partition_size: i64,
pub offset_lag: i64,
pub is_future_key: bool,
}
impl AdminClient {
pub async fn alter_replica_log_dirs(
&mut self,
assignments: &BTreeMap<String, Vec<(String, Vec<i32>)>>,
) -> Result<Vec<AlterReplicaLogDirOutcome>, AdminError> {
let dirs = assignments
.iter()
.map(|(path, topics)| AlterReplicaLogDir {
path: path.clone(),
topics: topics
.iter()
.map(|(name, partitions)| AlterReplicaLogDirTopic {
name: name.clone(),
partitions: partitions.clone(),
..Default::default()
})
.collect(),
..Default::default()
})
.collect();
let req = AlterReplicaLogDirsRequest {
dirs,
..Default::default()
};
let resp = self.conn.send(req).await?;
let mut out = Vec::new();
for topic in resp.results {
for p in topic.partitions {
let error = if p.error_code == 0 {
None
} else {
Some(KafkaError {
code: p.error_code,
name: kafka_error_name(p.error_code),
message: None,
})
};
out.push(AlterReplicaLogDirOutcome {
topic: topic.topic_name.clone(),
partition: p.partition_index,
error,
});
}
}
Ok(out)
}
pub async fn describe_log_dirs(
&mut self,
filter: Option<&BTreeMap<String, Vec<i32>>>,
) -> Result<Vec<LogDirInfo>, AdminError> {
let topics = filter.map(|f| {
f.iter()
.map(|(name, partitions)| DescribableLogDirTopic {
topic: name.clone(),
partitions: partitions.clone(),
..Default::default()
})
.collect()
});
let req = DescribeLogDirsRequest {
topics,
..Default::default()
};
let resp = self.conn.send(req).await?;
let mut out = Vec::new();
for result in resp.results {
let error = if result.error_code == 0 {
None
} else {
Some(KafkaError {
code: result.error_code,
name: kafka_error_name(result.error_code),
message: None,
})
};
let topics = result
.topics
.into_iter()
.map(|t| LogDirTopicInfo {
name: t.name,
partitions: t
.partitions
.into_iter()
.map(|p| LogDirPartitionInfo {
partition_index: p.partition_index,
partition_size: p.partition_size,
offset_lag: p.offset_lag,
is_future_key: p.is_future_key,
})
.collect(),
})
.collect();
out.push(LogDirInfo {
log_dir: result.log_dir,
error,
topics,
});
}
Ok(out)
}
}