use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crabka_log::{Log, LogConfig};
use dashmap::DashMap;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use crate::error::BrokerError;
use crate::log_dir;
use crate::partition::{Partition, SwapOutcome, WriterMessage};
use crate::partition_registry::PartitionRegistry;
#[allow(dead_code)]
#[derive(Debug)]
pub struct FutureLogState {
pub target_log_dir: PathBuf,
pub future_path: PathBuf,
pub future_log: Arc<Mutex<Log>>,
pub cancel: CancellationToken,
pub task: std::sync::Mutex<Option<JoinHandle<()>>>,
}
#[derive(Debug)]
pub enum MoveError {
LogDirNotFound,
ReplicaNotAvailable,
AlreadyMoving,
Storage(#[allow(dead_code)] BrokerError),
}
impl From<BrokerError> for MoveError {
fn from(e: BrokerError) -> Self {
MoveError::Storage(e)
}
}
impl From<crabka_log::LogError> for MoveError {
fn from(e: crabka_log::LogError) -> Self {
MoveError::Storage(BrokerError::from(e))
}
}
impl From<std::io::Error> for MoveError {
fn from(e: std::io::Error) -> Self {
MoveError::Storage(BrokerError::from(e))
}
}
const MOVE_READ_CHUNK_BYTES: usize = 1 << 20;
const MOVE_RETRY_BACKOFF: Duration = Duration::from_millis(50);
pub fn start_move(
partitions: &Arc<PartitionRegistry>,
future_logs: &Arc<DashMap<(String, i32), Arc<FutureLogState>>>,
all_log_dirs: &[PathBuf],
log_config: &LogConfig,
topic: &str,
partition: i32,
target_log_dir: &Path,
) -> Result<(), MoveError> {
let target_canon = canonicalize_or_self(target_log_dir);
let target_match = all_log_dirs
.iter()
.find(|d| canonicalize_or_self(d) == target_canon)
.cloned();
let Some(target_log_dir) = target_match else {
return Err(MoveError::LogDirNotFound);
};
let key = (topic.to_string(), partition);
let part = partitions
.get(topic, partition)
.ok_or(MoveError::ReplicaNotAvailable)?;
let current_log_dir = part.log_dir.load_full();
if canonicalize_or_self(¤t_log_dir) == canonicalize_or_self(&target_log_dir) {
return Ok(());
}
if let Some(existing) = future_logs.get(&key).map(|e| e.value().clone()) {
if canonicalize_or_self(&existing.target_log_dir) == canonicalize_or_self(&target_log_dir) {
return Ok(());
}
return Err(MoveError::AlreadyMoving);
}
let future_path = log_dir::future_partition_dir(&target_log_dir, topic, partition);
std::fs::create_dir_all(&future_path)?;
let future_log = Arc::new(Mutex::new(Log::open(&future_path, log_config.clone())?));
spawn_move(
partitions,
future_logs,
&target_log_dir,
future_path,
future_log,
topic,
partition,
&part,
);
Ok(())
}
pub fn resume_move(
partitions: &Arc<PartitionRegistry>,
future_logs: &Arc<DashMap<(String, i32), Arc<FutureLogState>>>,
target_log_dir: &Path,
log_config: &LogConfig,
topic: &str,
partition: i32,
) -> Result<(), MoveError> {
let part = partitions
.get(topic, partition)
.ok_or(MoveError::ReplicaNotAvailable)?;
let future_path = log_dir::future_partition_dir(target_log_dir, topic, partition);
let future_log = Arc::new(Mutex::new(Log::open(&future_path, log_config.clone())?));
spawn_move(
partitions,
future_logs,
target_log_dir,
future_path,
future_log,
topic,
partition,
&part,
);
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn spawn_move(
partitions: &Arc<PartitionRegistry>,
future_logs: &Arc<DashMap<(String, i32), Arc<FutureLogState>>>,
target_log_dir: &Path,
future_path: PathBuf,
future_log: Arc<Mutex<Log>>,
topic: &str,
partition: i32,
part: &Arc<Partition>,
) {
let cancel = CancellationToken::new();
let target_partition_path = log_dir::partition_dir(target_log_dir, topic, partition);
let task = tokio::spawn(replicator_loop(
part.clone(),
future_log.clone(),
future_path.clone(),
target_partition_path,
target_log_dir.to_path_buf(),
cancel.clone(),
partitions.clone(),
future_logs.clone(),
topic.to_string(),
partition,
));
let state = Arc::new(FutureLogState {
target_log_dir: target_log_dir.to_path_buf(),
future_path,
future_log,
cancel,
task: std::sync::Mutex::new(Some(task)),
});
future_logs.insert((topic.to_string(), partition), state);
}
#[allow(clippy::too_many_arguments)]
async fn replicator_loop(
part: Arc<Partition>,
future_log: Arc<Mutex<Log>>,
future_path: PathBuf,
target_partition_path: PathBuf,
target_log_dir: PathBuf,
cancel: CancellationToken,
_partitions: Arc<PartitionRegistry>,
future_logs: Arc<DashMap<(String, i32), Arc<FutureLogState>>>,
topic: String,
partition: i32,
) {
debug!(
topic = %topic, partition,
target = %target_log_dir.display(),
"future-log replicator started"
);
loop {
if cancel.is_cancelled() {
break;
}
let advance = match catch_up(&part, &future_log) {
Ok(v) => v,
Err(e) => {
warn!(
topic = %topic, partition,
error = %e,
"future-log replicator catch-up failed; retrying"
);
tokio::select! {
() = cancel.cancelled() => break,
() = tokio::time::sleep(MOVE_RETRY_BACKOFF) => continue,
}
}
};
if !advance.caught_up {
continue;
}
let (ack_tx, ack_rx) = oneshot::channel();
let send = part
.writer_tx
.send(WriterMessage::SwapFutureLog {
target_log_dir: target_log_dir.clone(),
future_log: future_log.clone(),
future_path: future_path.clone(),
target_partition_path: target_partition_path.clone(),
ack: ack_tx,
})
.await;
if send.is_err() {
warn!(
topic = %topic, partition,
"future-log replicator: partition writer is dead; aborting move"
);
break;
}
match ack_rx.await {
Ok(Ok(SwapOutcome::Swapped)) => {
debug!(topic = %topic, partition, "future-log swap complete");
break;
}
Ok(Ok(SwapOutcome::NotCaughtUp)) => {
}
Ok(Err(e)) => {
warn!(
topic = %topic, partition,
error = %e,
"future-log swap failed; aborting move (partition continues on source dir)"
);
break;
}
Err(_) => {
warn!(topic = %topic, partition, "future-log swap ack dropped");
break;
}
}
tokio::select! {
() = cancel.cancelled() => break,
() = part.append_notify.notified() => {}
}
}
future_logs.remove(&(topic, partition));
}
struct CatchUpProgress {
caught_up: bool,
}
fn catch_up(
part: &Arc<Partition>,
future_log: &Arc<Mutex<Log>>,
) -> Result<CatchUpProgress, BrokerError> {
let current_leo = part.log_end_offset();
let future_leo = future_log
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.log_end_offset();
if future_leo >= current_leo {
return Ok(CatchUpProgress { caught_up: true });
}
let read = {
let log = part
.log
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
log.read(future_leo, MOVE_READ_CHUNK_BYTES)?
};
if read.batches.is_empty() {
return Ok(CatchUpProgress { caught_up: true });
}
let mut future = future_log
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
for mut batch in read.batches {
let base = batch.base_offset;
future
.append_at(&mut batch, base)
.map_err(BrokerError::from)?;
}
Ok(CatchUpProgress { caught_up: false })
}
fn canonicalize_or_self(p: &Path) -> PathBuf {
std::fs::canonicalize(p).unwrap_or_else(|_| p.to_path_buf())
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use tempfile::tempdir;
#[test]
fn move_error_log_dir_not_found_when_target_unknown() {
let partitions = Arc::new(PartitionRegistry::new());
let future_logs = Arc::new(DashMap::new());
let log_dirs: Vec<PathBuf> = vec![];
let bogus = tempdir().unwrap();
let err = start_move(
&partitions,
&future_logs,
&log_dirs,
&LogConfig::default(),
"t",
0,
bogus.path(),
)
.expect_err("expected LogDirNotFound");
assert!(matches!(err, MoveError::LogDirNotFound));
}
#[test]
fn move_error_replica_not_available_when_partition_missing() {
let partitions = Arc::new(PartitionRegistry::new());
let future_logs = Arc::new(DashMap::new());
let dir = tempdir().unwrap();
let err = start_move(
&partitions,
&future_logs,
&[dir.path().to_path_buf()],
&LogConfig::default(),
"t",
0,
dir.path(),
)
.expect_err("expected ReplicaNotAvailable");
assert!(matches!(err, MoveError::ReplicaNotAvailable));
}
fn fixture_partition(log_dir: &Path, topic: &str, partition: i32) -> Arc<Partition> {
let part_dir = log_dir::partition_dir(log_dir, topic, partition);
std::fs::create_dir_all(&part_dir).unwrap();
let log = Log::open(&part_dir, LogConfig::default()).unwrap();
crate::broker::spawn_partition(
topic.to_string(),
partition,
log_dir.to_path_buf(),
log,
crate::log_dir_status::LogDirRegistry::default(),
)
}
#[tokio::test]
async fn start_move_to_current_dir_is_noop() {
let primary = tempdir().unwrap();
let extra = tempdir().unwrap();
let log_dirs = vec![primary.path().to_path_buf(), extra.path().to_path_buf()];
let partitions = Arc::new(PartitionRegistry::new());
let future_logs = Arc::new(DashMap::new());
let part = fixture_partition(primary.path(), "t", 0);
partitions.insert("t".to_string(), 0, part);
start_move(
&partitions,
&future_logs,
&log_dirs,
&LogConfig::default(),
"t",
0,
primary.path(),
)
.expect("noop should succeed");
assert!(
future_logs.is_empty(),
"noop must not register a future log"
);
}
#[tokio::test]
async fn start_move_idempotent_for_same_target() {
let primary = tempdir().unwrap();
let extra = tempdir().unwrap();
let log_dirs = vec![primary.path().to_path_buf(), extra.path().to_path_buf()];
let partitions = Arc::new(PartitionRegistry::new());
let future_logs = Arc::new(DashMap::new());
let part = fixture_partition(primary.path(), "t", 0);
partitions.insert("t".to_string(), 0, part);
let future_path = log_dir::future_partition_dir(extra.path(), "t", 0);
std::fs::create_dir_all(&future_path).unwrap();
let future_log = Arc::new(Mutex::new(
Log::open(&future_path, LogConfig::default()).unwrap(),
));
future_logs.insert(
("t".to_string(), 0),
Arc::new(FutureLogState {
target_log_dir: extra.path().to_path_buf(),
future_path: future_path.clone(),
future_log,
cancel: CancellationToken::new(),
task: std::sync::Mutex::new(None),
}),
);
start_move(
&partitions,
&future_logs,
&log_dirs,
&LogConfig::default(),
"t",
0,
extra.path(),
)
.expect("same-target alter must be idempotent");
assert!(future_logs.len() == 1);
}
#[tokio::test]
async fn start_move_rejects_conflicting_target() {
let primary = tempdir().unwrap();
let extra = tempdir().unwrap();
let third = tempdir().unwrap();
let log_dirs = vec![
primary.path().to_path_buf(),
extra.path().to_path_buf(),
third.path().to_path_buf(),
];
let partitions = Arc::new(PartitionRegistry::new());
let future_logs = Arc::new(DashMap::new());
let part = fixture_partition(primary.path(), "t", 0);
partitions.insert("t".to_string(), 0, part);
let future_path = log_dir::future_partition_dir(extra.path(), "t", 0);
std::fs::create_dir_all(&future_path).unwrap();
let future_log = Arc::new(Mutex::new(
Log::open(&future_path, LogConfig::default()).unwrap(),
));
future_logs.insert(
("t".to_string(), 0),
Arc::new(FutureLogState {
target_log_dir: extra.path().to_path_buf(),
future_path,
future_log,
cancel: CancellationToken::new(),
task: std::sync::Mutex::new(None),
}),
);
let err = start_move(
&partitions,
&future_logs,
&log_dirs,
&LogConfig::default(),
"t",
0,
third.path(),
)
.expect_err("conflicting-target alter must reject");
assert!(matches!(err, MoveError::AlreadyMoving));
}
}