1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
//! `AlterReplicaLogDirs` (`api_key=34`, KIP-113). Intra-broker
//! log-directory reassignment: move a hosted replica from one of this
//! broker's configured `log.dirs` to another, without re-replicating
//! from the leader. Backs `kafka-log-dirs --alter` and the
//! `--reassignment-json-file` log-dir overrides of
//! `kafka-reassign-partitions`.
//!
//! The handler immediately validates inputs, kicks off a per-move
//! background replicator task via [`crate::future_log::start_move`],
//! and returns success. The actual data copy + atomic dir-rename
//! happens in the background; clients poll `DescribeLogDirs` and
//! watch `is_future_key` flip from `true` to `false` to detect
//! completion.
//!
//! Authorisation (Cluster.Alter) is enforced at
//! [`crate::network::dispatch::handle_alter_replica_log_dirs_frame`] —
//! this handler runs only after the principal has been authorized.
use std::collections::BTreeMap;
use std::path::PathBuf;
use bytes::{Bytes, BytesMut};
use futures_util::future::BoxFuture;
use crabka_protocol::owned::alter_replica_log_dirs_request::AlterReplicaLogDirsRequest;
use crabka_protocol::owned::alter_replica_log_dirs_response::{
AlterReplicaLogDirPartitionResult, AlterReplicaLogDirTopicResult, AlterReplicaLogDirsResponse,
};
use crabka_protocol::{Decode, Encode};
use crate::broker::Broker;
use crate::codes;
use crate::error::BrokerError;
use crate::future_log::{self, MoveError};
pub(crate) fn handle(
broker: &Broker,
version: i16,
_correlation_id: i32,
req_bytes: &[u8],
) -> BoxFuture<'static, Result<Bytes, BrokerError>> {
let req_bytes = req_bytes.to_vec();
let partitions = broker.partitions.clone();
let future_logs = broker.future_logs.clone();
let all_log_dirs = broker.config.all_log_dirs();
let log_config = broker.config.log_config.clone();
Box::pin(async move {
let mut cur: &[u8] = &req_bytes;
let req = AlterReplicaLogDirsRequest::decode(&mut cur, version)?;
// (topic, partition) → error code. The wire format lets a
// client list the same partition under multiple target dirs;
// Apache Kafka treats the LAST occurrence as authoritative,
// so we overwrite as we iterate.
let mut per_partition: BTreeMap<(String, i32), i16> = BTreeMap::new();
for dir in req.dirs {
let target_path = PathBuf::from(&dir.path);
for topic in dir.topics {
for partition_index in topic.partitions {
let code = match future_log::start_move(
&partitions,
&future_logs,
&all_log_dirs,
&log_config,
&topic.name,
partition_index,
&target_path,
) {
Ok(()) => codes::NONE,
Err(MoveError::LogDirNotFound) => codes::LOG_DIR_NOT_FOUND,
Err(MoveError::ReplicaNotAvailable) => codes::REPLICA_NOT_AVAILABLE,
Err(MoveError::AlreadyMoving | MoveError::Storage(_)) => {
codes::KAFKA_STORAGE_ERROR
}
};
per_partition.insert((topic.name.clone(), partition_index), code);
}
}
}
// Group per-partition results back into the response's
// per-topic shape.
let mut by_topic: BTreeMap<String, Vec<AlterReplicaLogDirPartitionResult>> =
BTreeMap::new();
for ((topic, partition), code) in per_partition {
by_topic
.entry(topic)
.or_default()
.push(AlterReplicaLogDirPartitionResult {
partition_index: partition,
error_code: code,
..Default::default()
});
}
let results: Vec<_> = by_topic
.into_iter()
.map(|(name, partitions)| AlterReplicaLogDirTopicResult {
topic_name: name,
partitions,
..Default::default()
})
.collect();
let resp = AlterReplicaLogDirsResponse {
throttle_time_ms: 0,
results,
..Default::default()
};
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
Ok(buf.freeze())
})
}