use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use arc_swap::ArcSwap;
use crabka_log::Log;
use tokio::sync::{Notify, mpsc};
use crate::log_dir_status::LogDirRegistry;
use crate::partition::{ProduceJob, SwapOutcome, WriterMessage};
use crate::replica_state::ReplicaState;
fn flag_storage_failure(
err: &crate::error::BrokerError,
log_dir: &ArcSwap<PathBuf>,
log_dir_status: &LogDirRegistry,
) {
if let crate::error::BrokerError::Log(crabka_log::LogError::Io(io_err)) = err {
let dir = log_dir.load();
log_dir_status.mark_offline(&dir, &format!("partition write/fsync failed: {io_err}"));
}
}
fn lock_log(log: &Mutex<Log>) -> std::sync::MutexGuard<'_, Log> {
log.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
fn storage_failure_error(
context: &str,
join_err: &tokio::task::JoinError,
) -> crate::error::BrokerError {
let io = std::io::Error::other(format!("{context}: {join_err}"));
crate::error::BrokerError::Log(crabka_log::LogError::Io(io))
}
#[allow(clippy::too_many_lines)]
pub async fn run(
log: Arc<Mutex<Log>>,
log_dir: Arc<ArcSwap<PathBuf>>,
mut rx: mpsc::Receiver<WriterMessage>,
append_notify: Arc<Notify>,
replica_state: Arc<tokio::sync::Mutex<ReplicaState>>,
hw_advance_notify: Arc<Notify>,
log_dir_status: LogDirRegistry,
) {
while let Some(msg) = rx.recv().await {
match msg {
WriterMessage::Produce(ProduceJob { mut batch, ack }) => {
let target = lock_log(&log).config_snapshot().compression_type;
if let Some(target) = target {
let current = batch.attributes.compression();
if current != target {
batch.attributes = batch.attributes.with_compression(target);
}
}
let log_for_blocking = log.clone();
let join = tokio::task::spawn_blocking(move || {
let mut guard = lock_log(&log_for_blocking);
let result = guard
.append(&mut batch)
.map_err(crate::error::BrokerError::from);
let leo = result.as_ref().ok().map(|_| guard.log_end_offset());
(result, leo)
});
let (result, leo) = match join.await {
Ok(v) => v,
Err(join_err) => {
let err = storage_failure_error("append task panicked", &join_err);
flag_storage_failure(&err, &log_dir, &log_dir_status);
let _ = ack.send(Err(err));
continue;
}
};
let ok = result.is_ok();
if let Err(ref e) = result {
flag_storage_failure(e, &log_dir, &log_dir_status);
}
let _ = ack.send(result);
if ok {
append_notify.notify_waiters();
let leader_leo = leo.expect("LEO present on successful append");
let advanced = {
let mut st = replica_state.lock().await;
let prev = st.hw;
let new = st.recompute_hw_for_leader_append(leader_leo);
new > prev
};
if advanced {
hw_advance_notify.notify_waiters();
}
}
}
WriterMessage::Replicate { mut batch, ack } => {
let offset = batch.base_offset;
let log_for_blocking = log.clone();
let join = tokio::task::spawn_blocking(move || {
lock_log(&log_for_blocking)
.append_at(&mut batch, offset)
.map_err(crate::error::BrokerError::from)
});
let result = match join.await {
Ok(v) => v,
Err(join_err) => {
let err = storage_failure_error("replicate task panicked", &join_err);
flag_storage_failure(&err, &log_dir, &log_dir_status);
let _ = ack.send(Err(err));
continue;
}
};
let ok = result.is_ok();
if let Err(ref e) = result {
flag_storage_failure(e, &log_dir, &log_dir_status);
}
let _ = ack.send(result);
if ok {
append_notify.notify_waiters();
}
}
WriterMessage::Truncate { offset, ack } => {
let log_for_blocking = log.clone();
let join = tokio::task::spawn_blocking(move || {
lock_log(&log_for_blocking)
.truncate_to(offset)
.map_err(crate::error::BrokerError::from)
});
let result = match join.await {
Ok(v) => v,
Err(join_err) => {
let err = storage_failure_error("truncate task panicked", &join_err);
flag_storage_failure(&err, &log_dir, &log_dir_status);
let _ = ack.send(Err(err));
continue;
}
};
if let Err(ref e) = result {
flag_storage_failure(e, &log_dir, &log_dir_status);
}
let _ = ack.send(result);
}
WriterMessage::ResetTo { new_base, ack } => {
let log_for_blocking = log.clone();
let join = tokio::task::spawn_blocking(move || {
lock_log(&log_for_blocking)
.reset_to(new_base)
.map_err(crate::error::BrokerError::from)
});
let result = match join.await {
Ok(v) => v,
Err(join_err) => {
let err = storage_failure_error("reset_to task panicked", &join_err);
flag_storage_failure(&err, &log_dir, &log_dir_status);
let _ = ack.send(Err(err));
continue;
}
};
if let Err(ref e) = result {
flag_storage_failure(e, &log_dir, &log_dir_status);
}
let _ = ack.send(result);
}
WriterMessage::TrimToOffset { new_start, ack } => {
let log_for_blocking = log.clone();
let join = tokio::task::spawn_blocking(move || {
lock_log(&log_for_blocking)
.trim_to_offset(new_start)
.map_err(crate::error::BrokerError::from)
});
let result = match join.await {
Ok(v) => v,
Err(join_err) => {
let err = storage_failure_error("trim_to_offset task panicked", &join_err);
flag_storage_failure(&err, &log_dir, &log_dir_status);
let _ = ack.send(Err(err));
continue;
}
};
if let Err(ref e) = result {
flag_storage_failure(e, &log_dir, &log_dir_status);
}
let _ = ack.send(result);
}
WriterMessage::SetLogConfig { config, ack } => {
lock_log(&log).set_config(config);
let _ = ack.send(());
}
WriterMessage::Compact { ack } => {
let log_for_blocking = log.clone();
let join = tokio::task::spawn_blocking(move || {
lock_log(&log_for_blocking)
.compact()
.map_err(crate::error::BrokerError::from)
});
let result = match join.await {
Ok(v) => v,
Err(join_err) => {
let err = storage_failure_error("compact task panicked", &join_err);
flag_storage_failure(&err, &log_dir, &log_dir_status);
let _ = ack.send(Err(err));
continue;
}
};
if let Err(ref e) = result {
flag_storage_failure(e, &log_dir, &log_dir_status);
}
let _ = ack.send(result);
}
#[cfg(any(test, feature = "test-helpers"))]
WriterMessage::TestSetLogStart { new_start, ack } => {
let result = lock_log(&log)
.set_log_start_offset(new_start)
.map_err(crate::error::BrokerError::from);
let _ = ack.send(result);
}
WriterMessage::SwapFutureLog {
target_log_dir,
future_log,
future_path,
target_partition_path,
ack,
} => {
let result = swap_future_log(
&log,
&log_dir,
target_log_dir,
&future_log,
&future_path,
&target_partition_path,
);
let _ = ack.send(result);
}
}
}
}
fn swap_future_log(
log: &Arc<Mutex<Log>>,
log_dir: &Arc<ArcSwap<PathBuf>>,
target_log_dir: PathBuf,
future_log: &Arc<Mutex<Log>>,
future_path: &std::path::Path,
target_partition_path: &std::path::Path,
) -> Result<SwapOutcome, crate::error::BrokerError> {
let mut log_guard = lock_log(log);
let config = log_guard.config_snapshot();
let current_leo = log_guard.log_end_offset();
let mut future_guard = lock_log(future_log);
if future_guard.log_end_offset() < current_leo {
return Ok(SwapOutcome::NotCaughtUp);
}
let source_partition_path = log_guard.dir().to_path_buf();
let tomb_dir = future_path.with_extension("crabka-swap-tomb");
std::fs::create_dir_all(&tomb_dir)?;
let old_current = std::mem::replace(&mut *log_guard, Log::open(&tomb_dir, config.clone())?);
old_current.close();
let old_future = std::mem::replace(&mut *future_guard, Log::open(&tomb_dir, config.clone())?);
old_future.close();
drop(future_guard);
if let Err(e) = std::fs::rename(future_path, target_partition_path) {
match Log::open(&source_partition_path, config) {
Ok(reopened) => *log_guard = reopened,
Err(reopen_err) => {
tracing::error!(
error = %reopen_err,
"swap_future_log: rename failed AND source reopen failed; \
partition is offline until restart"
);
}
}
let _ = std::fs::remove_dir_all(&tomb_dir);
return Err(crate::error::BrokerError::from(e));
}
if let Err(e) = std::fs::remove_dir_all(&source_partition_path) {
tracing::warn!(
source = %source_partition_path.display(),
error = %e,
"swap_future_log: failed to remove source partition dir; \
partition is live at target, source will be cleaned on next restart"
);
}
let _ = std::fs::remove_dir_all(&tomb_dir);
*log_guard = Log::open(target_partition_path, config)?;
log_dir.store(Arc::new(target_log_dir));
Ok(SwapOutcome::Swapped)
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use crabka_log::LogConfig;
use crabka_protocol::records::{Record, RecordBatch};
use tempfile::tempdir;
use tokio::sync::oneshot;
fn sample_batch(n: i32) -> RecordBatch {
let mut b = RecordBatch {
last_offset_delta: n - 1,
..RecordBatch::default()
};
for i in 0..n {
b.records.push(Record {
offset_delta: i,
..Default::default()
});
}
b
}
#[tokio::test]
async fn writer_appends_and_acks() {
let dir = tempdir().expect("tempdir");
let log = Arc::new(Mutex::new(
Log::open(dir.path(), LogConfig::default()).expect("open log"),
));
let (tx, rx) = mpsc::channel(1);
let notify = Arc::new(Notify::new());
let writer = tokio::spawn(run(
log.clone(),
Arc::new(ArcSwap::from_pointee(dir.path().to_path_buf())),
rx,
notify.clone(),
Arc::new(tokio::sync::Mutex::new(
crate::replica_state::ReplicaState::new(),
)),
Arc::new(Notify::new()),
crate::log_dir_status::LogDirRegistry::default(),
));
let (ack, ack_rx) = oneshot::channel();
tx.send(WriterMessage::Produce(ProduceJob {
batch: sample_batch(3),
ack,
}))
.await
.expect("send job");
let assigned = ack_rx.await.expect("ack recv").expect("append ok");
assert!(assigned == 0);
let (ack, ack_rx) = oneshot::channel();
tx.send(WriterMessage::Produce(ProduceJob {
batch: sample_batch(2),
ack,
}))
.await
.expect("send job 2");
assert!(ack_rx.await.expect("ack recv 2").expect("append 2 ok") == 3);
drop(tx);
writer.await.expect("writer join");
}
#[tokio::test]
async fn writer_fires_notify_after_append() {
let dir = tempdir().expect("tempdir");
let log = Arc::new(Mutex::new(
Log::open(dir.path(), LogConfig::default()).expect("open log"),
));
let (tx, rx) = mpsc::channel(1);
let notify = Arc::new(Notify::new());
let writer = tokio::spawn(run(
log.clone(),
Arc::new(ArcSwap::from_pointee(dir.path().to_path_buf())),
rx,
notify.clone(),
Arc::new(tokio::sync::Mutex::new(
crate::replica_state::ReplicaState::new(),
)),
Arc::new(Notify::new()),
crate::log_dir_status::LogDirRegistry::default(),
));
let waiter = notify.notified();
tokio::pin!(waiter);
let (ack, _ack_rx) = oneshot::channel();
tx.send(WriterMessage::Produce(ProduceJob {
batch: sample_batch(1),
ack,
}))
.await
.expect("send job");
tokio::time::timeout(std::time::Duration::from_secs(1), waiter)
.await
.expect("notify did not fire");
drop(tx);
writer.await.expect("writer join");
}
#[tokio::test]
async fn writer_handles_replicate_with_caller_offset() {
let dir = tempdir().expect("tempdir");
let log = Arc::new(Mutex::new(
Log::open(dir.path(), LogConfig::default()).expect("open log"),
));
let (tx, rx) = mpsc::channel(1);
let notify = Arc::new(Notify::new());
let writer = tokio::spawn(run(
log.clone(),
Arc::new(ArcSwap::from_pointee(dir.path().to_path_buf())),
rx,
notify.clone(),
Arc::new(tokio::sync::Mutex::new(
crate::replica_state::ReplicaState::new(),
)),
Arc::new(Notify::new()),
crate::log_dir_status::LogDirRegistry::default(),
));
let mut batch = sample_batch(3);
batch.base_offset = 0;
let (ack, ack_rx) = oneshot::channel();
tx.send(WriterMessage::Replicate { batch, ack })
.await
.expect("send replicate");
ack_rx.await.expect("ack recv").expect("replicate ok");
assert!(log.lock().unwrap().log_end_offset() == 3);
drop(tx);
writer.await.expect("writer join");
}
#[tokio::test]
async fn writer_replicate_offset_mismatch_surfaces_error() {
let dir = tempdir().expect("tempdir");
let log = Arc::new(Mutex::new(
Log::open(dir.path(), LogConfig::default()).expect("open log"),
));
let (tx, rx) = mpsc::channel(1);
let notify = Arc::new(Notify::new());
let writer = tokio::spawn(run(
log.clone(),
Arc::new(ArcSwap::from_pointee(dir.path().to_path_buf())),
rx,
notify.clone(),
Arc::new(tokio::sync::Mutex::new(
crate::replica_state::ReplicaState::new(),
)),
Arc::new(Notify::new()),
crate::log_dir_status::LogDirRegistry::default(),
));
let mut batch = sample_batch(1);
batch.base_offset = 7;
let (ack, ack_rx) = oneshot::channel();
tx.send(WriterMessage::Replicate { batch, ack })
.await
.expect("send replicate");
let err = ack_rx
.await
.expect("ack recv")
.expect_err("expected offset mismatch");
assert!(matches!(err, crate::error::BrokerError::Log(_)));
assert!(log.lock().unwrap().log_end_offset() == 0);
drop(tx);
writer.await.expect("writer join");
}
#[tokio::test]
async fn writer_truncate_drops_records() {
let dir = tempdir().expect("tempdir");
let log = Arc::new(Mutex::new(
Log::open(dir.path(), LogConfig::default()).expect("open log"),
));
let (tx, rx) = mpsc::channel(1);
let notify = Arc::new(Notify::new());
let writer = tokio::spawn(run(
log.clone(),
Arc::new(ArcSwap::from_pointee(dir.path().to_path_buf())),
rx,
notify.clone(),
Arc::new(tokio::sync::Mutex::new(
crate::replica_state::ReplicaState::new(),
)),
Arc::new(Notify::new()),
crate::log_dir_status::LogDirRegistry::default(),
));
for _ in 0..2 {
let (ack, ack_rx) = oneshot::channel();
tx.send(WriterMessage::Produce(ProduceJob {
batch: sample_batch(2),
ack,
}))
.await
.expect("send produce");
ack_rx.await.expect("ack").expect("ok");
}
assert!(log.lock().unwrap().log_end_offset() == 4);
let (ack, ack_rx) = oneshot::channel();
tx.send(WriterMessage::Truncate { offset: 0, ack })
.await
.expect("send truncate");
ack_rx.await.expect("ack").expect("truncate ok");
assert!(log.lock().unwrap().log_end_offset() == 0);
drop(tx);
writer.await.expect("writer join");
}
#[tokio::test]
async fn writer_fires_hw_notify_after_produce_when_rf_one() {
let dir = tempdir().expect("tempdir");
let log = Arc::new(Mutex::new(
Log::open(dir.path(), LogConfig::default()).expect("open log"),
));
let (tx, rx) = mpsc::channel(1);
let append_notify = Arc::new(Notify::new());
let replica_state = Arc::new(tokio::sync::Mutex::new(
crate::replica_state::ReplicaState::new(),
));
{
let mut st = replica_state.lock().await;
st.install_isr(&[1], &[1], 1);
}
let hw_advance_notify = Arc::new(Notify::new());
let writer = tokio::spawn(run(
log.clone(),
Arc::new(ArcSwap::from_pointee(dir.path().to_path_buf())),
rx,
append_notify.clone(),
replica_state.clone(),
hw_advance_notify.clone(),
crate::log_dir_status::LogDirRegistry::default(),
));
let waiter = hw_advance_notify.notified();
tokio::pin!(waiter);
let (ack, _ack_rx) = oneshot::channel();
tx.send(WriterMessage::Produce(ProduceJob {
batch: sample_batch(2),
ack,
}))
.await
.expect("send job");
tokio::time::timeout(std::time::Duration::from_secs(1), waiter)
.await
.expect("hw_advance_notify did not fire");
assert!(replica_state.lock().await.hw == 2);
drop(tx);
writer.await.expect("writer join");
}
#[tokio::test]
async fn writer_set_log_config_swaps_config() {
use crabka_log::LogConfig;
let dir = tempdir().expect("tempdir");
let log = Arc::new(Mutex::new(
Log::open(dir.path(), LogConfig::default()).expect("open log"),
));
let (tx, rx) = mpsc::channel(1);
let append_notify = Arc::new(Notify::new());
let replica_state = Arc::new(tokio::sync::Mutex::new(
crate::replica_state::ReplicaState::new(),
));
let hw_advance_notify = Arc::new(Notify::new());
let writer = tokio::spawn(run(
log.clone(),
Arc::new(ArcSwap::from_pointee(dir.path().to_path_buf())),
rx,
append_notify,
replica_state,
hw_advance_notify,
crate::log_dir_status::LogDirRegistry::default(),
));
let new_cfg = LogConfig {
retention_ms: Some(std::time::Duration::from_mins(2)),
..LogConfig::default()
};
let (ack, ack_rx) = tokio::sync::oneshot::channel();
tx.send(WriterMessage::SetLogConfig {
config: new_cfg.clone(),
ack,
})
.await
.expect("send");
ack_rx.await.expect("ack");
let observed = log.lock().expect("lock").config_snapshot();
assert!(observed.retention_ms == new_cfg.retention_ms);
drop(tx);
writer.await.expect("writer join");
}
#[tokio::test]
async fn writer_trim_to_offset_advances_log_start() {
use crabka_log::LogConfig;
let dir = tempdir().expect("tempdir");
let log = Arc::new(Mutex::new(
Log::open(dir.path(), LogConfig::default()).expect("open log"),
));
for _ in 0..2 {
log.lock()
.expect("lock")
.append(&mut sample_batch(2))
.expect("append");
}
let (tx, rx) = mpsc::channel(1);
let append_notify = Arc::new(Notify::new());
let replica_state = Arc::new(tokio::sync::Mutex::new(
crate::replica_state::ReplicaState::new(),
));
let hw_advance_notify = Arc::new(Notify::new());
let writer = tokio::spawn(run(
log.clone(),
Arc::new(ArcSwap::from_pointee(dir.path().to_path_buf())),
rx,
append_notify,
replica_state,
hw_advance_notify,
crate::log_dir_status::LogDirRegistry::default(),
));
let (ack, ack_rx) = tokio::sync::oneshot::channel();
tx.send(WriterMessage::TrimToOffset { new_start: 3, ack })
.await
.expect("send");
let new_start = ack_rx.await.expect("ack").expect("trim ok");
assert!(new_start >= 3);
assert!(log.lock().expect("lock").log_start_offset() == new_start);
drop(tx);
writer.await.expect("writer join");
}
#[tokio::test]
async fn writer_does_not_advance_hw_when_followers_lagging() {
let dir = tempdir().expect("tempdir");
let log = Arc::new(Mutex::new(
Log::open(dir.path(), LogConfig::default()).expect("open log"),
));
let (tx, rx) = mpsc::channel(1);
let append_notify = Arc::new(Notify::new());
let replica_state = Arc::new(tokio::sync::Mutex::new(
crate::replica_state::ReplicaState::new(),
));
{
let mut st = replica_state.lock().await;
st.install_isr(&[1, 2, 3], &[1, 2, 3], 1);
}
let hw_advance_notify = Arc::new(Notify::new());
let writer = tokio::spawn(run(
log.clone(),
Arc::new(ArcSwap::from_pointee(dir.path().to_path_buf())),
rx,
append_notify.clone(),
replica_state.clone(),
hw_advance_notify.clone(),
crate::log_dir_status::LogDirRegistry::default(),
));
let (ack, ack_rx) = oneshot::channel();
tx.send(WriterMessage::Produce(ProduceJob {
batch: sample_batch(3),
ack,
}))
.await
.expect("send job");
ack_rx.await.expect("ack").expect("append ok");
assert!(replica_state.lock().await.hw == 0);
drop(tx);
writer.await.expect("writer join");
}
}